Java 中的数据流和函数式编程

了解如何在 Java 8 中使用流 API 和函数式编程结构。
119 位读者喜欢这篇文章。
computer screen

Opensource.com

Java SE 8(又名核心 Java 8)于 2014 年推出,它引入了一些从根本上影响其编程的更改。这些更改有两个紧密联系的部分:流 API 和函数式编程结构。本文使用从基础知识到高级功能的代码示例,介绍每个部分并说明它们之间的相互作用。

基础知识

流 API 是一种简洁且高级的方式来迭代数据序列中的元素。java.util.streamjava.util.function 包包含用于流 API 和相关函数式编程结构的新库。当然,一个代码示例胜过千言万语。

下面的代码片段用大约 2,000 个随机整数值填充一个 List

Random rand = new Random();
List<Integer> list = new ArrayList<Integer>();           // empty list
for (int i = 0; i < 2048; i++) list.add(rand.nextInt()); // populate it

可以使用另一个 for 循环迭代填充的列表,以将偶数值收集到另一个列表中。流 API 是一种更简洁的方式来做同样的事情

List <Integer> evens = list
   .stream()                      // streamify the list
   .filter(n -> (n & 0x1) == 0)   // filter out odd values
   .collect(Collectors.toList()); // collect even values

该示例有来自流 API 的三个函数

  • stream 函数可以将 Collection 转换为流,这是一个可以一次访问一个值的传送带。流化是惰性的(因此是高效的),因为这些值是根据需要生成的,而不是一次全部生成。

  • filter 函数确定哪些流式值(如果有)可以通过处理管道中的下一阶段,即 collect 阶段。filter 函数是高阶的,因为它的参数是一个函数——在本例中,是一个 lambda,它是一个未命名的函数,并且位于 Java 新的函数式编程结构的核心。

    lambda 语法与传统的 Java 大相径庭

    n -> (n & 0x1) == 0

    箭头(减号紧随其后的是大于号)将左侧的参数列表与右侧的函数体分开。参数 n 没有显式类型,尽管可以有;无论如何,编译器都会确定 n 是一个 Integer。如果有多个参数,这些参数将用括号括起来并用逗号分隔。

    在本例中,主体检查整数的最低位(最右边)是否为零,这表示一个偶数值。过滤器应返回布尔值。函数的正文中没有显式的 return,尽管可以有。如果主体没有显式的 return,则主体的最后一个表达式是返回的值。在本例中,以 lambda 编程的精神编写,主体由单个简单的布尔表达式 (n & 0x1) == 0 组成。

  • collect 函数将偶数值收集到一个列表,该列表的引用为 evens。如下面的示例所示,collect 函数是线程安全的,因此即使过滤操作在多个线程之间共享,它也能正确工作。

便捷函数和简单的多线程

在生产环境中,数据流可能以文件或网络连接作为其来源。为了学习流 API,Java 提供了诸如 IntStream 之类的类型,它可以生成具有各种类型元素的流。这是一个 IntStream 示例

IntStream                         // integer stream
   .range(1, 2048)                // generate a stream of ints in this range
   .parallel()                    // partition the data for multiple threads
   .filter(i -> ((i & 0x1) > 0))  // odd parity? pass through only odds
   .forEach(System.out::println); // print each

IntStream 类型包括一个 range 函数,该函数生成指定范围内的一系列整数值,在本例中,是从 1 到 2,048,增量为 1。parallel 函数自动将要完成的工作分配到多个线程中,每个线程都进行过滤和打印。(线程数通常与主机系统上的 CPU 数量匹配。)forEach 函数的参数是一个方法引用,在本例中,是对封装在 System.out 中的 println 方法的引用,其类型为 PrintStream。方法和构造函数引用的语法将在稍后讨论。

由于多线程,整数值总体上以任意顺序打印,但在给定线程中按顺序打印。例如,如果线程 T1 打印 409 和 411,则 T1 按 409-411 的顺序执行,但某个其他线程可能事先打印 2,045。parallel 调用后面的线程并发执行,因此它们的输出顺序是不确定的。

map/reduce 模式

map/reduce 模式在处理大型数据集时变得很流行。map/reduce 宏操作由两个微操作构建。数据首先被分散(映射)到各个工作者,然后将单独的结果收集在一起——可能作为一个单一的值,这将是归约。归约可以采用不同的形式,如下面的示例所示。

下面的 Number 类的实例表示具有 EVENODD 奇偶性的整数值

public class Number {
    enum Parity { EVEN, ODD }
    private int value;
    public Number(int n) { setValue(n); }
    public void setValue(int value) { this.value = value; }
    public int getValue() { return this.value; }
    public Parity getParity() {
        return ((value & 0x1) == 0) ? Parity.EVEN : Parity.ODD;
    }
    public void dump() {
        System.out.format("Value: %2d (parity: %s)\n", getValue(),
                          (getParity() == Parity.ODD ? "odd" : "even"));
    }
}

以下代码说明了如何使用 Number 流进行 map/reduce,从而表明流 API 不仅可以处理诸如 intfloat 之类的原始类型,还可以处理程序员定义的类类型。

在下面的代码片段中,使用 parallelStream 而不是 stream 函数对随机整数值列表进行流化。parallelStream 变体,就像之前介绍的 parallel 函数一样,执行自动多线程处理。

final int howMany = 200;
Random r = new Random();
Number[ ] nums = new Number[howMany];
for (int i = 0; i < howMany; i++) nums[i] = new Number(r.nextInt(100));
List<Number> listOfNums = Arrays.asList(nums);  // listify the array

Integer sum4All = listOfNums
   .parallelStream()           // automatic multi-threading
   .mapToInt(Number::getValue) // method reference rather than lambda
   .sum();                     // reduce streamed values to a single value
System.out.println("The sum of the randomly generated values is: " + sum4All);

高阶 mapToInt 函数可以将 lambda 作为参数,但在本例中,它采用方法引用,即 Number::getValuegetValue 方法不期望任何参数,并为其给定的 Number 实例返回其 int 值。语法很简单:类名 Number 后面跟一个双冒号和方法名称。回想一下之前的 System.out::println 示例,其中双冒号位于 System 类中的 static 字段 out 之后。

方法引用 Number::getValue 可以由下面的 lambda 替换。参数 n 是流中的 Number 实例之一

mapToInt(n -> n.getValue())

通常,lambda 和方法引用是可以互换的:如果诸如 mapToInt 之类的高阶函数可以将一种形式作为参数,那么该函数也可以采用另一种形式。这两种函数式编程结构具有相同的目的——对作为参数传入的数据执行一些自定义操作。在这两者之间进行选择通常是方便的问题。例如,lambda 可以在没有封装类的情况下编写,而方法则不能。我的习惯是使用 lambda,除非已经手头有适当的封装方法。

当前示例末尾的 sum 函数通过组合来自 parallelStream 线程的部分和来以线程安全的方式执行归约。但是,程序员有责任确保,在 parallelStream 调用引起的多线程过程中,程序员自己的函数调用(在本例中是对 getValue 的调用)是线程安全的。

最后一点值得强调。Lambda 语法鼓励编写纯函数,即其返回值仅取决于传入的参数(如果有)的函数;纯函数没有副作用,例如更新类中的 static 字段。纯函数因此是线程安全的,并且流 API 在传递给诸如 filtermap 之类的高阶函数的函数式参数是纯函数时效果最佳。

为了实现更细粒度的控制,还有一个名为 reduce 的流 API 函数,可用于对 Number 流中的值求和

Integer sum4AllHarder = listOfNums
   .parallelStream()                           // multi-threading
   .map(Number::getValue)                      // value per Number
   .reduce(0, (sofar, next) -> sofar + next);  // reduction to a sum

此版本的 reduce 函数接受两个参数,第二个参数是一个函数

  • 第一个参数(在本例中为零)是标识值,它用作归约操作的初始值,并在归约期间流运行干时用作默认值。
  • 第二个参数是累加器,在本例中,是一个带有两个参数的 lambda:第一个参数 (sofar) 是运行总和,第二个参数 (next) 是流中的下一个值。然后将运行总和和下一个值相加,以更新累加器。请记住,由于开始时的 parallelStream 调用,mapreduce 函数现在都在多线程上下文中执行。

到目前为止的示例中,流值被收集然后被归约,但是通常,流 API 中的 Collectors 可以累积值而不将其归约到单个值。集合活动可以产生任意丰富的数据结构,如下面的代码片段所示。该示例使用与前面示例相同的 listOfNums

Map<Number.Parity, List<Number>> numMap = listOfNums
   .parallelStream()
   .collect(Collectors.groupingBy(Number::getParity));

List<Number> evens = numMap.get(Number.Parity.EVEN);
List<Number> odds = numMap.get(Number.Parity.ODD);

第一行中的 numMap 指的是一个 Map,其键是 Number 奇偶校验 (ODDEVEN),其值是具有具有指定奇偶校验的值的 Number 实例的 List。同样,通过 parallelStream 调用进行多线程处理,然后 collect 调用(以线程安全的方式)将部分结果组装成 numMap 引用的单个 Map。然后对 numMap 调用两次 get 方法,一次用于获取 evens,第二次用于获取 odds

实用程序函数 dumpList 再次使用来自流 API 的高阶 forEach 函数

private void dumpList(String msg, List<Number> list) {
   System.out.println("\n" + msg);
   list.stream().forEach(n -> n.dump()); // or: forEach(Number::dump)
}

这是来自示例运行的程序输出的切片

The sum of the randomly generated values is: 3322
The sum again, using a different method:     3322

Evens:

Value: 72 (parity: even)
Value: 54 (parity: even)
...
Value: 92 (parity: even)

Odds:

Value: 35 (parity: odd)
Value: 37 (parity: odd)
...
Value: 41 (parity: odd)

用于简化代码的函数式结构

函数式结构(如方法引用和 lambda)非常适合流 API。这些结构代表了 Java 中高阶函数的主要简化。即使在过去,Java 在技术上通过 MethodConstructor 类型支持高阶函数,这些类型的实例可以作为参数传递给其他函数。这些类型被使用——但很少在生产级 Java 中使用,正是因为它们的复杂性。例如,调用 Method 需要对象引用(如果该方法不是 static)或至少需要类标识符(如果该方法是 static)。然后将调用的 Method 的参数作为 Object 实例传递给它,如果多态性(另一个复杂性!)不起作用,则可能需要显式向下转换。相比之下,lambda 和方法引用很容易作为参数传递给其他函数。

但是,新的函数式结构除了流 API 之外还有其他用途。考虑一个带有按钮的 Java GUI 程序,例如,供用户按下以获取当前时间。按钮按下的事件处理程序可以编写如下

JButton updateCurrentTime = new JButton("Update current time");
updateCurrentTime.addActionListener(new ActionListener() {
   @Override
   public void actionPerformed(ActionEvent e) {
      currentTime.setText(new Date().toString());
   }
});

这段简短的代码片段很难解释。考虑 addActionListener 方法中以如下方式开始的第二个参数

new ActionListener() {

这似乎是错误的,因为 ActionListener 是一个 abstract(抽象)接口,而 abstract(抽象)类型不能通过调用 new 来实例化。然而,事实证明,被实例化的完全是别的东西:一个实现该接口的未命名内部类。 如果上面的代码封装在一个名为 OldJava 的类中,那么这个未命名的内部类将被编译为 OldJava$1.classactionPerformed 方法在未命名的内部类中被重写。

现在考虑使用新的函数式结构进行的这种令人耳目一新的改变

updateCurrentTime.addActionListener(e -> currentTime.setText(new Date().toString()));

lambda 中的参数 e 是一个 ActionEvent 实例,而 lambda 的主体是对按钮 setText 的一个简单调用。

函数式接口和组合

到目前为止,使用的 lambda 都是就地编写的。 然而,为了方便起见,可以像引用封装方法一样引用 lambda。 以下一系列简短的示例说明了这一点。

考虑这个接口定义

@FunctionalInterface // optional, usually omitted
interface BinaryIntOp {
    abstract int compute(int arg1, int arg2); // abstract could be dropped
}

@FunctionalInterface 注解适用于任何声明单个抽象方法的接口; 在这种情况下,是 compute。 几个标准接口(例如,具有其单个声明方法 runRunnable 接口)符合要求。 在此示例中,compute 是声明的方法。 该接口可以用作引用声明中的目标类型

BinaryIntOp div = (arg1, arg2) -> arg1 / arg2;
div.compute(12, 3); // 4

java.util.function 提供了各种函数式接口。 一些示例如下。

下面的代码片段介绍了参数化的 Predicate 函数式接口。 在此示例中,带有参数 String 的类型 Predicate<String> 可以引用带有 String 参数的 lambda 或诸如 isEmptyString 方法。 通常,predicate(谓词)是返回布尔值的函数。

Predicate<String> pred = String::isEmpty; // predicate for a String method
String[ ] strings = {"one", "two", "", "three", "four"};
Arrays.asList(strings)
   .stream()
   .filter(pred)                  // filter out non-empty strings
   .forEach(System.out::println); // only the empty string is printed

只有当字符串的长度为零时,isEmpty 谓词才评估为 true; 因此,只有空字符串才能通过管道中的 forEach 阶段。

以下代码片段说明了如何将简单的 lambda 或方法引用组合成更丰富的 lambda 或方法引用。 考虑对 IntUnaryOperator 类型的引用进行的一系列赋值,该类型接受一个整数参数并返回一个整数值

IntUnaryOperator doubled = n -> n * 2;
IntUnaryOperator tripled = n -> n * 3;
IntUnaryOperator squared = n -> n * n;

IntUnaryOperator 是一个 FunctionalInterface,其唯一声明的方法是 applyAsInt。 现在,这三个引用 doubledtripledsquared 可以单独使用或以各种组合使用

int arg = 5;
doubled.applyAsInt(arg); // 10
tripled.applyAsInt(arg); // 15
squared.applyAsInt(arg); // 25

以下是一些示例组合

int arg = 5;
doubled.compose(squared).applyAsInt(arg); // doubled-the-squared: 50
tripled.compose(doubled).applyAsInt(arg); // tripled-the-doubled: 30
doubled.andThen(squared).applyAsInt(arg); // doubled-andThen-squared: 100
squared.andThen(tripled).applyAsInt(arg); // squared-andThen-tripled: 75

可以使用就地 lambda 进行组合,但引用使代码更简洁。

构造器引用

构造器引用是另一种函数式编程结构,但这些引用在比 lambda 和方法引用更微妙的上下文中很有用。 再次,一个代码示例似乎是最好的说明方式。

考虑这个 POJO

public class BedRocker { // resident of Bedrock
    private String name;
    public BedRocker(String name) { this.name = name; }
    public String getName() { return this.name; }
    public void dump() { System.out.println(getName()); }
}

该类有一个构造器,它需要一个 String 参数。 给定一个名称数组,目标是生成一个 BedRocker 元素数组,每个名称一个。 以下是使用函数式结构来完成此操作的代码段

String[ ] names = {"Fred", "Wilma", "Peebles", "Dino", "Baby Puss"};

Stream<BedRocker> bedrockers = Arrays.asList(names).stream().map(BedRocker::new);
BedRocker[ ] arrayBR = bedrockers.toArray(BedRocker[]::new);

Arrays.asList(arrayBR).stream().forEach(BedRocker::dump);

在高层次上,此代码段将名称转换为 BedRocker 数组元素。 详细地讲,代码的工作方式如下。 Stream 接口(在包 java.util.stream 中)可以参数化,在本例中,生成一个名为 bedrockersBedRocker 项目流。

再次使用 Arrays.asList 实用程序来流化数组 names,然后将每个流项目传递给 map 函数,该函数的参数现在是构造器引用 BedRocker::new。 此构造器引用充当对象工厂,通过在每次调用时生成和初始化 BedRocker 实例。 在执行第二行后,名为 bedrockers 的流由五个 BedRocker 项目组成。

可以通过关注高阶 map 函数来进一步阐明该示例。 在典型情况下,映射将一种类型的值(例如,int)转换为相同类型的不同值(例如,整数的后继者)

map(n -> n + 1) // map n to its successor

但是,在 BedRocker 示例中,转换更为显着,因为一种类型的值(表示名称的 String)映射到不同类型的值,在本例中,是一个以字符串作为其名称的 BedRocker 实例。 该转换通过构造器调用完成,构造器引用启用了该调用

map(BedRocker::new) // map a String to a BedRocker

传递给构造器的值是 names 数组中的一个名称。

此代码示例的第二行还说明了现在已为人熟知的将数组首先转换为 List 然后转换为 Stream 的转换

Stream<BedRocker> bedrockers = Arrays.asList(names).stream().map(BedRocker::new);

第三行反过来 - 通过调用带有数组构造器引用 BedRocker[]::newtoArray 方法,将流 bedrockers 转换为数组

BedRocker[ ] arrayBR = bedrockers.toArray(BedRocker[]::new);

此构造器引用不会创建单个 BedRocker 实例,而是创建这些实例的整个数组:构造器引用现在是 BedRocker[]::new 而不是 BedRocker::new。 为了确认,arrayBR 被转换为 List,然后再次被流化,以便可以使用 forEach 打印 BedRocker 名称

Fred
Wilma
Peebles
Dino
Baby Puss

该示例对数据结构的巧妙转换仅用几行代码即可完成,这突出了各种高阶函数的强大功能,这些高阶函数可以将 lambda、方法引用或构造器引用作为参数

柯里化 (Currying)

柯里化一个函数就是减少(通常减少一个)函数完成任何工作所需的显式参数的数量。 (该术语是为了纪念逻辑学家哈斯克尔·柯里。)通常,如果函数具有较少的参数,则更容易调用并且更健壮。 (回想一下一些噩梦般的函数,它需要六个左右的参数!)因此,柯里化应该被视为简化函数调用的努力。 java.util.function 包中的接口类型适合柯里化,如下一个示例所示。

IntBinaryOperator 接口类型的引用用于接受两个整数参数并返回一个整数值的函数

IntBinaryOperator mult2 = (n1, n2) -> n1 * n2;
mult2.applyAsInt(10, 20); // 200
mult2.applyAsInt(10, 30); // 300

引用名称 mult2 强调需要两个显式参数,在本示例中为 10 和 20。

先前介绍的 IntUnaryOperatorIntBinaryOperator 更简单,因为前者只需要一个参数,而后者需要两个参数。 两者都返回一个整数值。 因此,目标是将名为 mult2 的双参数 IntBinraryOperator 柯里化为一个单参数 IntUnaryOperator 版本 curriedMult2

考虑类型 IntFunction<R>。 此类型的函数接受一个整数参数并返回类型为 R 的结果,该结果可以是另一个函数 - 实际上是 IntBinaryOperator。 让 lambda 返回另一个 lambda 很简单

arg1 -> (arg2 -> arg1 * arg2) // parentheses could be omitted

完整的 lambda 以 arg1 开头,此 lambda 的主体(和返回值)是另一个 lambda,它以 arg2 开头。 返回的 lambda 仅接受一个参数 (arg2),但返回两个数字 (arg1arg2) 的乘积。 以下概述(后跟代码)应阐明这一点。

以下是关于如何柯里化 mult2 的概述

  • 类型为 IntFunction<IntUnaryOperator> 的 lambda 被编写并使用整数值(例如 10)调用。 返回的 IntUnaryOperator 缓存值 10,从而成为 mult2 的柯里化版本,在本例中为 curriedMult2
  • 然后使用单个显式参数(例如 20)调用 curriedMult2 函数,该参数与缓存的参数(在本例中为 10)相乘,以产生返回的乘积。

以下是代码中的详细信息

// Create a function that takes one argument n1 and returns a one-argument
// function n2 -> n1 * n2 that returns an int (the product n1 * n2).
IntFunction<IntUnaryOperator> curriedMult2Maker = n1 -> (n2 -> n1 * n2);

调用 curriedMult2Maker 会生成所需的 IntUnaryOperator 函数

// Use the curriedMult2Maker to get a curried version of mult2.
// The argument 10 is n1 from the lambda above.
IntUnaryOperator curriedMult2 = curriedMult2Maker2.apply(10);

值 10 现在缓存在 curriedMult2 函数中,以便 curriedMult2 调用中的显式整数参数将乘以 10

curriedMult2.applyAsInt(20); // 200 = 10 * 20
curriedMult2.applyAsInt(80); // 800 = 10 * 80

可以随意更改缓存值

curriedMult2 = curriedMult2Maker.apply(50); // cache 50
curriedMult2.applyAsInt(101);               // 5050 = 101 * 50

当然,可以以这种方式创建 mult2 的多个柯里化版本,每个版本都是 IntUnaryOperator

柯里化利用了 lambda 的强大功能:可以轻松编写 lambda 以返回所需的任何类型的值,包括另一个 lambda。

总结

Java 仍然是一种基于类的面向对象编程语言。 但凭借流 API 及其支持的函数式结构,Java 朝着 Lisp 等函数式语言迈出了决定性(且受欢迎的)一步。 结果是 Java 更适合处理现代编程中如此常见的大量数据流。 功能方向的这一步也使得更容易编写以先前代码示例中突出显示的管道样式编写清晰、简洁的 Java

dataStream
   .parallelStream() // multi-threaded for efficiency
   .filter(...)      // stage 1
   .map(...)         // stage 2
   .filter(...)      // stage 3
   ...
   .collect(...);    // or, perhaps, reduce: stage N

自动多线程,通过 parallelparallelStream 调用进行说明,建立在 Java 的 fork/join 框架之上,该框架支持任务窃取以提高效率。 假设 parallelStream 调用后面的线程池由八个线程组成,并且 dataStream 被划分为八种方式。 一些线程(例如,T1)可能比另一个线程(例如,T7)工作得更快,这意味着 T7 的某些任务应该移动到 T1 的工作队列中。 这会在运行时自动发生。

在这个简单的多线程世界中,程序员的主要责任是编写线程安全函数,作为参数传递给在流 API 中占主导地位的高阶函数。 特别是,Lambda 鼓励编写纯粹的(因此也是线程安全的)函数。

标签
User profile image.
我是计算机科学领域的学者(德保罗大学计算与数字媒体学院),在软件开发方面拥有丰富的经验,主要是在生产计划和调度(钢铁行业)以及产品配置(卡车和公共汽车制造)方面。 有关书籍和其他出版物的详细信息,请访问

3 条评论

柯里化是我在上述整篇文章中最喜欢的部分,用于开发我最喜欢的基于 java 的流式数据应用程序。 我确实有一个名为 tvtap pro 的应用程序,它流式传输媒体数据,并且我仅通过上述编码技巧进行开发。 感谢指南。 您可以在此处查看 https://tvtap-pro.net/ 并为我们提供真正的反馈,我们非常感谢您的努力。 谢谢。

杰出的文章。 我遇到的关于该主题的最易于理解和最全面的处理方法之一。 干得好!

很棒的文章,我完全喜欢

Creative Commons License本作品采用知识共享署名-相同方式共享 4.0 国际许可协议进行许可。
© . All rights reserved.