技术学习分享_CKX技术 技术资讯 从启动到关闭 | SeaTunnel2.1.1源码解析

从启动到关闭 | SeaTunnel2.1.1源码解析

广告位

点亮 ⭐️ Star · 照亮开源之路

GitHub:https://github.com/apache/incubator-seatunnel

从启动到关闭 | SeaTunnel2.1.1源码解析

目录

本文转载自Adobee Chen的博客-CSDN博客,看看是否有你感兴趣的吧!

如有出错,请多指正。

一、启动脚本解析

二、源码解析

01 入口

02 execute()核心方法

  1. 其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型
  2. execute()方法向下走,创建一个执行环境。
  3. 调用plugin.prepare(env)
  4. 最后启动 execution.start(sources, transforms, sinks);执行flink 代码程序
  5. 最后关闭

一、启动脚本解析

在 /bin/start-seatunnel-flink.sh

#!/bin/bash  ​  function usage() {    echo "Usage: start-seatunnel-flink.sh [options]"    echo "  options:"    echo "    --config, -c FILE_PATH        Config file"    echo "    --variable, -i PROP=VALUE     Variable substitution, such as -i city=beijing, or -i date=20190318"    echo "    --check, -t                   Check config"    echo "    --help, -h                    Show this help message"  }     if [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ $# -le 1 ]]; then    usage    exit 0  fi     is_exist() {      if [ -z $1 ]; then        usage        exit -1      fi  }     PARAMS=""  while (( "$#" )); do    case "$1" in      -c|--config)        CONFIG_FILE=$2        is_exist ${CONFIG_FILE}        shift 2        ;;         -i|--variable)        variable=$2        is_exist ${variable}        java_property_value="-D${variable}"        variables_substitution="${java_property_value} ${variables_substitution}"        shift 2        ;;         *) # preserve positional arguments        PARAMS="$PARAMS $1"        shift        ;;       esac  done     if [ -z ${CONFIG_FILE} ]; then    echo "Error: The following option is required: [-c | --config]"    usage    exit -1  fi     # set positional arguments in their proper place  eval set -- "$PARAMS"     BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"  APP_DIR=$(dirname ${BIN_DIR})  CONF_DIR=${APP_DIR}/config  PLUGINS_DIR=${APP_DIR}/lib  DEFAULT_CONFIG=${CONF_DIR}/application.conf  CONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG}     assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar)     if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then      source ${CONF_DIR}/seatunnel-env.sh  fi     string_trim() {      echo $1 | awk '{$1=$1;print}'  }     export JVM_ARGS=$(string_trim "${variables_substitution}")        exec ${FLINK_HOME}/bin/flink run       ${PARAMS}       -c org.apache.seatunnel.SeatunnelFlink       ${assemblyJarName} --config ${CONFIG_FILE}  

其中: 启动脚本能接收的 –config –variable –check(还不支持) –help

只要不是 config、variable参数就放到PARAMS参数里,最后执行flink 执行命令,PARAMS当作flink参数执行。

org.apache.seatunnel.SeatunnelFlink 这个类就是主入口

二、源码解析

01 入口

public class SeatunnelFlink {         public static void main(String[] args) throws Exception {          FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);          Seatunnel.run(flinkArgs);      }     }  

FlinkCommandArgs中进行命令行参数解析

  public static FlinkCommandArgs parseFlinkArgs(String[] args) {          FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();          JCommander.newBuilder()              .addObject(flinkCommandArgs)              .build()              .parse(args);          return flinkCommandArgs;      }  

进入到Seatunnel.run(flinkArgs);

  public static FlinkCommandArgs parseFlinkArgs(String[] args) {          FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();          JCommander.newBuilder()              .addObject(flinkCommandArgs)              .build()              .parse(args);          return flinkCommandArgs;      }  

进入到CommandFactory.createCommand(commandArgs)

根据不同的类型选择Command

我们看的是flinkCommand

    public static extends CommandArgs> Command createCommand(T commandArgs) {          switch (commandArgs.getEngineType()) {              case FLINK:                  return (Command) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs);              case SPARK:                  return (Command) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs);              default:                  throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType()));          }      }  

进入到 buildCommand

根据是否检查config进入到不同的实现类

    public Command buildCommand(FlinkCommandArgs commandArgs) {          return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand();      }  

FlinkConfValidateCommand、

FlinkTaskExecuteCommand

两个类都实现了Command类

并且都只有一个execute()方法

public class FlinkConfValidateCommand implements Command  public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<flinkcommandargs, FlinkEnvironment>  

**在SeaTunnel.run(flinkArgs)**进入

command.execute(commandArgs);

我们先看FlinkTaskExecuteCommand

类中的execute方法

02 execute()核心方法

public void execute(FlinkCommandArgs flinkCommandArgs) {          //flink          EngineType engine = flinkCommandArgs.getEngineType();          // --config          String configFile = flinkCommandArgs.getConfigFile();          //将String变成Config类          Config config = new ConfigBuilder<>(configFile, engine).getConfig();          //解析执行上下文          ExecutionContext executionContext = new ExecutionContext<>(config, engine);          //解析 sources模块          List<basesource> sources = executionContext.getSources();</basesource          //解析 tansform模块          List<basetransform> transforms = executionContext.getTransforms();</basetransform          //解析 sink模块          List<basesink> sinks = executionContext.getSinks();</basesink             baseCheckConfig(sinks, transforms, sinks);          showAsciiLogo();             try (Execution<basesource,</basesource                  BaseTransform,                  BaseSink,                  FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {              //准备              prepare(executionContext.getEnvironment(), sources, transforms, sinks);              //启动              execution.start(sources, transforms, sinks);              //关闭              close(sources, transforms, sinks);          } catch (Exception e) {              throw new RuntimeException("Execute Flink task error", e);          }      }  

1.其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型

从启动到关闭 | SeaTunnel2.1.1源码解析

从启动到关闭 | SeaTunnel2.1.1源码解析

从启动到关闭 | SeaTunnel2.1.1源码解析

如果我们的source、sink是kafka的话那么对应的就是source就是KafkaTableStream、Sink就是KafkaSink

2. execute()方法向下走,创建一个执行环境。

从启动到关闭 | SeaTunnel2.1.1源码解析

进入ExecutionFactory种的createExecution()

    public Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT> createExecution() {</basesource          Execution execution = null;          switch (executionContext.getEngine()) {              case SPARK:                  SparkEnvironment sparkEnvironment = (SparkEnvironment) executionContext.getEnvironment();                  switch (executionContext.getJobMode()) {                      case STREAMING:                          execution = new SparkStreamingExecution(sparkEnvironment);                          break;                      case STRUCTURED_STREAMING:                          execution = new StructuredStreamingExecution(sparkEnvironment);                          break;                      default:                          execution = new SparkBatchExecution(sparkEnvironment);                  }                  break;              case FLINK:                  FlinkEnvironment flinkEnvironment = (FlinkEnvironment) executionContext.getEnvironment();                  switch (executionContext.getJobMode()) {                      case STREAMING:                          execution = new FlinkStreamExecution(flinkEnvironment);                          break;                      default:                          execution = new FlinkBatchExecution(flinkEnvironment);                  }                  break;              default:                  throw new IllegalArgumentException("No suitable engine");          }          LOGGER.info("current execution is [{}]", execution.getClass().getName());          return (Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT>) execution;</basesource      }    

进入到FlinkStreamExecution中,可以看到最终是创建flink 执行环境。

 private final FlinkEnvironment flinkEnvironment;         public FlinkStreamExecution(FlinkEnvironment streamEnvironment) {          this.flinkEnvironment = streamEnvironment;      }  

3. 调用plugin.prepare(env)

    protected final void prepare(E env, List extends Plugin>... plugins) {          for (List extends Plugin> pluginList : plugins) {              pluginList.forEach(plugin -> plugin.prepare(env));          }      }  

例如kafka->kafka

KafkaTableStream prepare

    public void prepare(FlinkEnvironment env) {          topic = config.getString(TOPICS);          PropertiesUtil.setProperties(config, kafkaParams, consumerPrefix, false);          tableName = config.getString(RESULT_TABLE_NAME);          if (config.hasPath(ROWTIME_FIELD)) {              rowTimeField = config.getString(ROWTIME_FIELD);              if (config.hasPath(WATERMARK_VAL)) {                  watermark = config.getLong(WATERMARK_VAL);              }          }          String schemaContent = config.getString(SCHEMA);          format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());          schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);      }  

KafkaSink prepare

   public void prepare(FlinkEnvironment env) {          topic = config.getString("topics");          if (config.hasPath("semantic")) {              semantic = config.getString("semantic");          }          String producerPrefix = "producer.";          PropertiesUtil.setProperties(config, kafkaParams, producerPrefix, false);          kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");          kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");      }  

4.启动execution.start

(sources, transforms, sinks);

通过步骤2.已经知道execution是根据不同引擎创建不同的执行环境,kafka是FlinkStreamExecution。那么就在FlinkStreamExecution中找到start()方法

5.执行flink 代码程序

其中sorce.getDate在KafkaTableStream中的getDate方法,sink在KafkaSink中的outputStream方法

 public void start(List sources, List transforms, List sinks) throws Exception {          List<datastream> data = new ArrayList<>();</datastream             for (FlinkStreamSource source : sources) {              DataStream dataStream = source.getData(flinkEnvironment);              data.add(dataStream);              registerResultTable(source, dataStream);          }             DataStream input = data.get(0);             for (FlinkStreamTransform transform : transforms) {              DataStream stream = fromSourceTable(transform.getConfig()).orElse(input);              input = transform.processStream(flinkEnvironment, stream);              registerResultTable(transform, input);              transform.registerFunction(flinkEnvironment);          }             for (FlinkStreamSink sink : sinks) {              DataStream stream = fromSourceTable(sink.getConfig()).orElse(input);              sink.outputStream(flinkEnvironment, stream);          }          try {              LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());              flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());          } catch (Exception e) {              LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());              throw e;          }      }  

6.最后关闭

    protected final void close(List extends Plugin>... plugins) {          PluginClosedException exceptionHolder = null;          for (List extends Plugin> pluginList : plugins) {              for (Plugin plugin : pluginList) {                  try (Plugin> closed = plugin) {                      // ignore                  } catch (Exception e) {                      exceptionHolder = exceptionHolder == null ?                              new PluginClosedException("below plugins closed error:") : exceptionHolder;                      exceptionHolder.addSuppressed(new PluginClosedException(                              String.format("plugin %s closed error", plugin.getClass()), e));                  }              }          }          if (exceptionHolder != null) {              throw exceptionHolder;          }      }  

Apache SeaTunnel

来,和社区一同成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

仓库地址: https://github.com/apache/incubator-seatunnel

网址:https://seatunnel.apache.org/

**Proposal:**https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

**Apache SeaTunnel(Incubating) 2.1.0 下载地址:**https://seatunnel.apache.org/download

衷心欢迎更多人加入!

我们相信,在**「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」**(精英管理)、以及「**多样性与共识决策」**等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:https://github.com/apache/incubator-seatunnel/issues

贡献代码:https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : dev-subscribe@seatunnel.apache.org

**开发邮件列表:**dev@seatunnel.apache.org

加入 Slack:https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: https://twitter.com/ASFSeaTunnel

< ?? >

本文来自网络,不代表技术学习分享_CKX技术立场,转载请注明出处。

作者: CKX技术

上一篇
下一篇
广告位

发表回复

返回顶部