原创

Spring Cloud Flow与 Apache Spark 集成

1.简介

Spring Cloud Data Flow是用于构建数据集成和实时数据处理管道的工具包。
在这种情况下,管道(Pipelines)是使用Spring Cloud StreamSpring Cloud Task框架构建的Spring Boot应用程序。

在本教程中,我们将展示如何将Spring Cloud Data FlowApache Spark一起使用。

2.本地数据流服务

首先,我们需要运行数据流服务器(Data Flow Server)才能部署我们的作业(jobs)。要在本地运行数据流服务器,需要使用spring-cloud-starter-dataflow-server-local依赖创建一个新项目:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

之后,使用@EnableDataFlowServer来注解服务中的主类(main class):

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

运行此应用程序后,本地数据流服务运行在端口9393

3.新建工程

我们将Spark Job作为本地单体应用程序创建,这样我们就不需要任何集群来运行它。

3.1依赖

首先,添加Spark依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.4.0</version>
</dependency>

3.2 创建job

job来说,就是为了求pi的近似值:

public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))
          .collect(Collectors.toList());

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;
        });

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);

        context.stop();
    }
}

4. Data Flow Shell

Data Flow Shell是一个 允许我们与服务器交互的应用程序Shell使用DSL命令来描述数据流。

要使用Data Flow Shell,我们要创建一个运行它的项目。 首先,需要spring-cloud-dataflow-shell依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

添加依赖项后,我们可以创建主类来运行Data Flow Shell

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

5.部署项目

为了部署我们的项目,可在三个版本(clusteryarnclient)中使用Apache Spark所谓任务运行器(task runner)—— 我们将使用client版本。任务运行器(task runner)是真正运行Spark job的实例。为此,我们首先需要使用Data Flow Shell注册task

app register --type task --name spark-client --uri 
maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

task允许我们指定多个不同的参数,其中一些参数是可选的,但是一些参数是正确部署Spark job所必需的:

  • spark.app-class,已提交job的主类
  • spark.app-jar,包含jobfat-jar路径
  • spark.app-name,job的名称
  • spark.app-args,将传递给job的参数
    我们可以使用注册的任务spark-client提交我们的工作,记住提供所需的参数:
    task create spark1 --definition "spark-client \
    --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
    --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
    

请注意,spark.app-jar是我们jobfat-jar的路径。成功创建任务后,我们可以使用以下命令继续运行它:

task launch spark1

这将调用task的执行。

6.总结

在本教程中,我们展示了如何使用Spring Cloud Data Flow框架来处理Apache Spark数据。 有关Spring Cloud Data Flow框架的更多信息,请参阅文档

所有代码示例都可以在GitHub上找到。

正文到此结束