点亮 ⭐️ Star · 照亮开源之路
GitHub:https://github.com/apache/incubator-seatunnel
目录
本文转载自Adobee Chen的博客-CSDN博客,看看是否有你感兴趣的吧!
如有出错,请多指正。
一、启动脚本解析
二、源码解析
01 入口
02 execute()核心方法
- 其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型
- execute()方法向下走,创建一个执行环境。
- 调用plugin.prepare(env)
- 最后启动 execution.start(sources, transforms, sinks);执行flink 代码程序
- 最后关闭
一、启动脚本解析
在 /bin/start-seatunnel-flink.sh
#!/bin/bash function usage() { echo "Usage: start-seatunnel-flink.sh [options]" echo " options:" echo " --config, -c FILE_PATH Config file" echo " --variable, -i PROP=VALUE Variable substitution, such as -i city=beijing, or -i date=20190318" echo " --check, -t Check config" echo " --help, -h Show this help message" } if [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ $# -le 1 ]]; then usage exit 0 fi is_exist() { if [ -z $1 ]; then usage exit -1 fi } PARAMS="" while (( "$#" )); do case "$1" in -c|--config) CONFIG_FILE=$2 is_exist ${CONFIG_FILE} shift 2 ;; -i|--variable) variable=$2 is_exist ${variable} java_property_value="-D${variable}" variables_substitution="${java_property_value} ${variables_substitution}" shift 2 ;; *) # preserve positional arguments PARAMS="$PARAMS $1" shift ;; esac done if [ -z ${CONFIG_FILE} ]; then echo "Error: The following option is required: [-c | --config]" usage exit -1 fi # set positional arguments in their proper place eval set -- "$PARAMS" BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" APP_DIR=$(dirname ${BIN_DIR}) CONF_DIR=${APP_DIR}/config PLUGINS_DIR=${APP_DIR}/lib DEFAULT_CONFIG=${CONF_DIR}/application.conf CONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG} assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar) if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then source ${CONF_DIR}/seatunnel-env.sh fi string_trim() { echo $1 | awk '{$1=$1;print}' } export JVM_ARGS=$(string_trim "${variables_substitution}") exec ${FLINK_HOME}/bin/flink run ${PARAMS} -c org.apache.seatunnel.SeatunnelFlink ${assemblyJarName} --config ${CONFIG_FILE}
其中: 启动脚本能接收的 –config –variable –check(还不支持) –help
只要不是 config、variable参数就放到PARAMS参数里,最后执行flink 执行命令,PARAMS当作flink参数执行。
org.apache.seatunnel.SeatunnelFlink 这个类就是主入口
二、源码解析
01 入口
public class SeatunnelFlink { public static void main(String[] args) throws Exception { FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args); Seatunnel.run(flinkArgs); } }
FlinkCommandArgs中进行命令行参数解析
public static FlinkCommandArgs parseFlinkArgs(String[] args) { FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); JCommander.newBuilder() .addObject(flinkCommandArgs) .build() .parse(args); return flinkCommandArgs; }
进入到Seatunnel.run(flinkArgs);
public static FlinkCommandArgs parseFlinkArgs(String[] args) { FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); JCommander.newBuilder() .addObject(flinkCommandArgs) .build() .parse(args); return flinkCommandArgs; }
进入到CommandFactory.createCommand(commandArgs)
根据不同的类型选择Command
我们看的是flinkCommand
public static extends CommandArgs> Command createCommand(T commandArgs) { switch (commandArgs.getEngineType()) { case FLINK: return (Command) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs); case SPARK: return (Command) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs); default: throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType())); } }
进入到 buildCommand
根据是否检查config进入到不同的实现类
public Command buildCommand(FlinkCommandArgs commandArgs) { return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand(); }
FlinkConfValidateCommand、
FlinkTaskExecuteCommand
两个类都实现了Command类
并且都只有一个execute()方法
public class FlinkConfValidateCommand implements Command public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<flinkcommandargs, FlinkEnvironment>
**在SeaTunnel.run(flinkArgs)**进入
command.execute(commandArgs);
我们先看FlinkTaskExecuteCommand
类中的execute方法
02 execute()核心方法
public void execute(FlinkCommandArgs flinkCommandArgs) { //flink EngineType engine = flinkCommandArgs.getEngineType(); // --config String configFile = flinkCommandArgs.getConfigFile(); //将String变成Config类 Config config = new ConfigBuilder<>(configFile, engine).getConfig(); //解析执行上下文 ExecutionContext executionContext = new ExecutionContext<>(config, engine); //解析 sources模块 List<basesource> sources = executionContext.getSources();</basesource //解析 tansform模块 List<basetransform> transforms = executionContext.getTransforms();</basetransform //解析 sink模块 List<basesink> sinks = executionContext.getSinks();</basesink baseCheckConfig(sinks, transforms, sinks); showAsciiLogo(); try (Execution<basesource,</basesource BaseTransform, BaseSink, FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) { //准备 prepare(executionContext.getEnvironment(), sources, transforms, sinks); //启动 execution.start(sources, transforms, sinks); //关闭 close(sources, transforms, sinks); } catch (Exception e) { throw new RuntimeException("Execute Flink task error", e); } }
1.其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型
如果我们的source、sink是kafka的话那么对应的就是source就是KafkaTableStream、Sink就是KafkaSink
2. execute()方法向下走,创建一个执行环境。
进入ExecutionFactory种的createExecution()
public Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT> createExecution() {</basesource Execution execution = null; switch (executionContext.getEngine()) { case SPARK: SparkEnvironment sparkEnvironment = (SparkEnvironment) executionContext.getEnvironment(); switch (executionContext.getJobMode()) { case STREAMING: execution = new SparkStreamingExecution(sparkEnvironment); break; case STRUCTURED_STREAMING: execution = new StructuredStreamingExecution(sparkEnvironment); break; default: execution = new SparkBatchExecution(sparkEnvironment); } break; case FLINK: FlinkEnvironment flinkEnvironment = (FlinkEnvironment) executionContext.getEnvironment(); switch (executionContext.getJobMode()) { case STREAMING: execution = new FlinkStreamExecution(flinkEnvironment); break; default: execution = new FlinkBatchExecution(flinkEnvironment); } break; default: throw new IllegalArgumentException("No suitable engine"); } LOGGER.info("current execution is [{}]", execution.getClass().getName()); return (Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT>) execution;</basesource }
进入到FlinkStreamExecution中,可以看到最终是创建flink 执行环境。
private final FlinkEnvironment flinkEnvironment; public FlinkStreamExecution(FlinkEnvironment streamEnvironment) { this.flinkEnvironment = streamEnvironment; }
3. 调用plugin.prepare(env)
protected final void prepare(E env, List extends Plugin>... plugins) { for (List extends Plugin> pluginList : plugins) { pluginList.forEach(plugin -> plugin.prepare(env)); } }
例如kafka->kafka
KafkaTableStream prepare
public void prepare(FlinkEnvironment env) { topic = config.getString(TOPICS); PropertiesUtil.setProperties(config, kafkaParams, consumerPrefix, false); tableName = config.getString(RESULT_TABLE_NAME); if (config.hasPath(ROWTIME_FIELD)) { rowTimeField = config.getString(ROWTIME_FIELD); if (config.hasPath(WATERMARK_VAL)) { watermark = config.getLong(WATERMARK_VAL); } } String schemaContent = config.getString(SCHEMA); format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase()); schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField); }
KafkaSink prepare
public void prepare(FlinkEnvironment env) { topic = config.getString("topics"); if (config.hasPath("semantic")) { semantic = config.getString("semantic"); } String producerPrefix = "producer."; PropertiesUtil.setProperties(config, kafkaParams, producerPrefix, false); kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); }
4.启动execution.start
(sources, transforms, sinks);
通过步骤2.已经知道execution是根据不同引擎创建不同的执行环境,kafka是FlinkStreamExecution。那么就在FlinkStreamExecution中找到start()方法
5.执行flink 代码程序
其中sorce.getDate在KafkaTableStream中的getDate方法,sink在KafkaSink中的outputStream方法
public void start(List sources, List transforms, List sinks) throws Exception { List<datastream> data = new ArrayList<>();</datastream for (FlinkStreamSource source : sources) { DataStream dataStream = source.getData(flinkEnvironment); data.add(dataStream); registerResultTable(source, dataStream); } DataStream input = data.get(0); for (FlinkStreamTransform transform : transforms) { DataStream stream = fromSourceTable(transform.getConfig()).orElse(input); input = transform.processStream(flinkEnvironment, stream); registerResultTable(transform, input); transform.registerFunction(flinkEnvironment); } for (FlinkStreamSink sink : sinks) { DataStream stream = fromSourceTable(sink.getConfig()).orElse(input); sink.outputStream(flinkEnvironment, stream); } try { LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan()); flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName()); } catch (Exception e) { LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName()); throw e; } }
6.最后关闭
protected final void close(List extends Plugin>... plugins) { PluginClosedException exceptionHolder = null; for (List extends Plugin> pluginList : plugins) { for (Plugin plugin : pluginList) { try (Plugin> closed = plugin) { // ignore } catch (Exception e) { exceptionHolder = exceptionHolder == null ? new PluginClosedException("below plugins closed error:") : exceptionHolder; exceptionHolder.addSuppressed(new PluginClosedException( String.format("plugin %s closed error", plugin.getClass()), e)); } } } if (exceptionHolder != null) { throw exceptionHolder; } }
Apache SeaTunnel
来,和社区一同成长!
Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。
仓库地址: https://github.com/apache/incubator-seatunnel
网址:https://seatunnel.apache.org/
**Proposal:**https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal
**Apache SeaTunnel(Incubating) 2.1.0 下载地址:**https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在**「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」**(精英管理)、以及「**多样性与共识决策」**等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:https://github.com/apache/incubator-seatunnel/issues
贡献代码:https://github.com/apache/incubator-seatunnel/pulls
订阅社区开发邮件列表 : dev-subscribe@seatunnel.apache.org
**开发邮件列表:**dev@seatunnel.apache.org
加入 Slack:https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ
关注 Twitter: https://twitter.com/ASFSeaTunnel
< ?? >