Flink task 配置并发数
in Tutorial with 0 comment
Flink task 配置并发数
in Tutorial with 0 comment

背景:我们用 Node.js 启多实例的时候,通过 pm2 cluster 模式启动多实例的方式提高服务的数据处理效率,Flink 也有类似的多实例的配置,但又有点不一样,这里单独列出来说一下,同时又结合 kafka 分区也简单说一下。


Flink 程序可以由不同的 task (如:transformations/opterators,data sources 及 data sinks等) 组成,一个 task 会分发到多个并发实例中运行,并且每个并发实例处理 task 的部分输入数据集。一个 task 的并发实例数叫做 parallelism。

如果你想使用savepoints的话,你需要设置一个最大并发数,当你从 savepoints 中重新获取数据时,你可以改变并发数,但是新的并发数必须大于先前的并发数。

配置 parallelism

task 的 parallelism 可以在 Flink 的不同级别上指定。

算子(operator)级别

每个 operator、data source 或者 data sink 都可以通过调用 setParallelism() 方法来指定parallelism,Scala 代码例如:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")

运行环境级别

Flink 程序是在一个运行环境的上下文中运行的。一个运行环境为每个 operator、data source 和data sink 的运行定义了一个默认的并发数。运行环境的并发数可以被每个算子确切的并发数配置所覆盖。

运行环境的默认并发数可以通过调用 setParallelism() 方法来指定。为了让所有的operator、data source和data sink以3个并发数来运行,你可按如下方法来设置运行环境的默认并发数,Scala 代码例如:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
wordCounts.print()

env.execute("Word Count Example")

客户端级别

并发数可以在提交 Job 到 Flink 的客户端设置,客户端可以是 Java 或者 Scala 程序,典型的例子如: Flink 命令行接口(CLI)。
对于 CLI 客户端,并发参数可以通过 -p 来指定,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Scala 程序中,可以按如下方式指定:

try {
    val program = new PackagedProgram(file, args)
    val jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    val config = new Configuration()

    val client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader())

    // 将并发度设为10
    client.run(program, 10, true)

} catch {
    case e: Exception => e.printStackTrace
}

系统级别

影响所有运行环境的系统级别的默认并发度可以在./conf/flink-conf.yaml的parallelism.defaul项中指定。

kafka 分区数与 Flink parallelism

在 kafka 中,同一个消费组的不同消费者至少消费一个分区,不可能存在两个不同的消费者消费同一个分区的场景。Flink 的消费者数量依赖 flink parallelism(默认为1)

有下面三种场景:

1、kafka partitions == flink parallelism
这个场景是最理想的和最好的,每个消费者只关心一个分区。如果每个分区的消息是均衡的,则每个 flink 算子工作量也均衡。

2、kafka partitions < flink parallelism
这个场景下,有些 flink 实例将不会收到任何消息,实例空跑,占用资源。我们需要避免这种情况,我们可以在调用输入流之前执行重新平衡rebalance,例如:

inputStream = env.addSource(new FlinkKafkaConsumer10("topic", new SimpleStringSchema(), properties));

inputStream
    .rebalance()
    .map(s -> "message" + s)
    .print();

3、kafka partitions > flink parallelism
这个场景下,一些实例会处理多个分区的消息。同样地,我们也可以使用rebalance来避免这种情况。


收官。

Responses