在 IBM Bluemix 云平臺上開發(fā)并部署您的下一個應用。 為什么需要 StreamStream 作為 Java 8 的一大亮點,它與 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 對 XML 解析的 Stream,也不是 Amazon Kinesis 對大數據實時處理的 Stream。Java 8 中的 Stream 是對集合(Collection)對象功能的增強,它專注于對集合對象進行各種非常便利、高效的聚合操作(aggregate operation),或者大批量數據操作 (bulk data operation)。Stream API 借助于同樣新出現的 Lambda 表達式,極大的提高編程效率和程序可讀性。同時它提供串行和并行兩種模式進行匯聚操作,并發(fā)模式能夠充分利用多核處理器的優(yōu)勢,使用 fork/join 并行方式來拆分任務和加速處理過程。通常編寫并行代碼很難而且容易出錯, 但使用 Stream API 無需編寫一行多線程的代碼,就可以很方便地寫出高性能的并發(fā)程序。所以說,Java 8 中首次出現的 java.util.stream 是一個函數式語言+多核時代綜合影響的產物。 什么是聚合操作在傳統的 J2EE 應用中,Java 代碼經常不得不依賴于關系型數據庫的聚合操作來完成諸如:
這類的操作。 但在當今這個數據大爆炸的時代,在數據來源多樣化、數據海量化的今天,很多時候不得不脫離 RDBMS,或者以底層返回的數據為基礎進行更上層的數據統計。而 Java 的集合 API 中,僅僅有極少量的輔助型方法,更多的時候是程序員需要用 Iterator 來遍歷集合,完成相關的聚合應用邏輯。這是一種遠不夠高效、笨拙的方法。在 Java 7 中,如果要發(fā)現 type 為 grocery 的所有交易,然后返回以交易值降序排序好的交易 ID 集合,我們需要這樣寫: 清單 1. Java 7 的排序、取值實現List<Transaction> groceryTransactions = new Arraylist<>(); for(Transaction t: transactions){ if(t.getType() == Transaction.GROCERY){ groceryTransactions.add(t); } } Collections.sort(groceryTransactions, new Comparator(){ public int compare(Transaction t1, Transaction t2){ return t2.getValue().compareTo(t1.getValue()); } }); List<Integer> transactionIds = new ArrayList<>(); for(Transaction t: groceryTransactions){ transactionsIds.add(t.getId()); } 而在 Java 8 使用 Stream,代碼更加簡潔易讀;而且使用并發(fā)模式,程序執(zhí)行速度更快。 清單 2. Java 8 的排序、取值實現List<Integer> transactionsIds = transactions.parallelStream(). filter(t -> t.getType() == Transaction.GROCERY). sorted(comparing(Transaction::getValue).reversed()). map(Transaction::getId). collect(toList()); Stream 總覽什么是流Stream 不是集合元素,它不是數據結構并不保存數據,它是有關算法和計算的,它更像一個高級版本的 Iterator。原始版本的 Iterator,用戶只能顯式地一個一個遍歷元素并對其執(zhí)行某些操作;高級版本的 Stream,用戶只要給出需要對其包含的元素執(zhí)行什么操作,比如 “過濾掉長度大于 10 的字符串”、“獲取每個字符串的首字母”等,Stream 會隱式地在內部進行遍歷,做出相應的數據轉換。 Stream 就如同一個迭代器(Iterator),單向,不可往復,數據只能遍歷一次,遍歷過一次后即用盡了,就好比流水從面前流過,一去不復返。 而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顧名思義,當使用串行方式去遍歷時,每個 item 讀完后再讀下一個 item。而使用并行去遍歷時,數據會被分成多個段,其中每一個都在不同的線程中處理,然后將結果一起輸出。Stream 的并行操作依賴于 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。Java 的并行 API 演變歷程基本如下:
Stream 的另外一大特點是,數據源本身可以是無限的。 流的構成當我們使用一個流的時候,通常包括三個基本步驟: 獲取一個數據源(source)→ 數據轉換→執(zhí)行操作獲取想要的結果,每次轉換原有 Stream 對象不改變,返回一個新的 Stream 對象(可以有多次轉換),這就允許對其操作可以像鏈條一樣排列,變成一個管道,如下圖所示。 圖 1. 流管道 (Stream Pipeline) 的構成有多種方式生成 Stream Source:
流的操作類型分為兩種:
在對于一個 Stream 進行多次轉換操作 (Intermediate 操作),每次都對 Stream 的每個元素進行轉換,而且是執(zhí)行多次,這樣時間復雜度就是 N(轉換次數)個 for 循環(huán)里把所有操作都做掉的總和嗎?其實不是這樣的,轉換操作都是 lazy 的,多個轉換操作只會在 Terminal 操作的時候融合起來,一次循環(huán)完成。我們可以這樣簡單的理解,Stream 里有個操作函數的集合,每次轉換操作就是把轉換函數放入這個集合中,在 Terminal 操作的時候循環(huán) Stream 對應的集合,然后對每個元素執(zhí)行所有的函數。 還有一種操作被稱為 short-circuiting。用以指:
當操作一個無限大的 Stream,而又希望在有限時間內完成操作,則在管道內擁有一個 short-circuiting 操作是必要非充分條件。 清單 3. 一個流操作的示例int sum = widgets.stream() .filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight()) .sum(); stream() 獲取當前小物件的 source,filter 和 mapToInt 為 intermediate 操作,進行數據篩選和轉換,最后一個 sum() 為 terminal 操作,對符合條件的全部小物件作重量求和。 流的使用詳解簡單說,對 Stream 的使用就是實現一個 filter-map-reduce 過程,產生一個最終結果,或者導致一個副作用(side effect)。 流的構造與轉換下面提供最常見的幾種構造 Stream 的樣例。 清單 4. 構造流的幾種常見方法// 1. Individual values Stream stream = Stream.of("a", "b", "c"); // 2. Arrays String [] strArray = new String[] {"a", "b", "c"}; stream = Stream.of(strArray); stream = Arrays.stream(strArray); // 3. Collections List<String> list = Arrays.asList(strArray); stream = list.stream(); 需要注意的是,對于基本數值型,目前有三種對應的包裝類型 Stream: IntStream、LongStream、DoubleStream。當然我們也可以用 Stream<Integer>、Stream<Long> >、Stream<Double>,但是 boxing 和 unboxing 會很耗時,所以特別為這三種基本數值型提供了對應的 Stream。 Java 8 中還沒有提供其它數值型 Stream,因為這將導致擴增的內容較多。而常規(guī)的數值型聚合運算可以通過上面三種 Stream 進行。 清單 5. 數值流的構造IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println); IntStream.range(1, 3).forEach(System.out::println); IntStream.rangeClosed(1, 3).forEach(System.out::println); 清單 6. 流轉換為其它數據結構// 1. Array String[] strArray1 = stream.toArray(String[]::new); // 2. Collection List<String> list1 = stream.collect(Collectors.toList()); List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new)); Set set1 = stream.collect(Collectors.toSet()); Stack stack1 = stream.collect(Collectors.toCollection(Stack::new)); // 3. String String str = stream.collect(Collectors.joining()).toString(); 一個 Stream 只可以使用一次,上面的代碼為了簡潔而重復使用了數次。 流的操作接下來,當把一個數據結構包裝成 Stream 后,就要開始對里面的元素進行各類操作了。常見的操作可以歸類如下。
map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit 我們下面看一下 Stream 的比較典型用法。 map/flatMap 我們先來看 map。如果你熟悉 scala 這類函數式語言,對這個方法應該很了解,它的作用就是把 input Stream 的每一個元素,映射成 output Stream 的另外一個元素。 清單 7. 轉換大寫List<String> output = wordList.stream(). map(String::toUpperCase). collect(Collectors.toList()); 這段代碼把所有的單詞轉換為大寫。 清單 8. 平方數List<Integer> nums = Arrays.asList(1, 2, 3, 4); List<Integer> squareNums = nums.stream(). map(n -> n * n). collect(Collectors.toList()); 這段代碼生成一個整數 list 的平方數 {1, 4, 9, 16}。 從上面例子可以看出,map 生成的是個 1:1 映射,每個輸入元素,都按照規(guī)則轉換成為另外一個元素。還有一些場景,是一對多映射關系的,這時需要 flatMap。 清單 9. 一對多Stream<List<Integer>> inputStream = Stream.of( Arrays.asList(1), Arrays.asList(2, 3), Arrays.asList(4, 5, 6) ); Stream<Integer> outputStream = inputStream. flatMap((childList) -> childList.stream()); flatMap 把 input Stream 中的層級結構扁平化,就是將最底層元素抽出來放到一起,最終 output 的新 Stream 里面已經沒有 List 了,都是直接的數字。 filter filter 對原始 Stream 進行某項測試,通過測試的元素被留下來生成一個新 Stream。 清單 10. 留下偶數Integer[] sixNums = {1, 2, 3, 4, 5, 6}; Integer[] evens = Stream.of(sixNums).filter(n -> n%2 == 0).toArray(Integer[]::new); 經過條件“被 2 整除”的 filter,剩下的數字為 {2, 4, 6}。 清單 11. 把單詞挑出來List<String> output = reader.lines(). flatMap(line -> Stream.of(line.split(REGEXP))). filter(word -> word.length() > 0). collect(Collectors.toList()); 這段代碼首先把每行的單詞用 flatMap 整理到新的 Stream,然后保留長度不為 0 的,就是整篇文章中的全部單詞了。 forEach forEach 方法接收一個 Lambda 表達式,然后在 Stream 的每一個元素上執(zhí)行該表達式。 清單 12. 打印姓名(forEach 和 pre-java8 的對比)// Java 8 roster.stream() .filter(p -> p.getGender() == Person.Sex.MALE) .forEach(p -> System.out.println(p.getName())); // Pre-Java 8 for (Person p : roster) { if (p.getGender() == Person.Sex.MALE) { System.out.println(p.getName()); } } 對一個人員集合遍歷,找出男性并打印姓名??梢钥闯鰜?,forEach 是為 Lambda 而設計的,保持了最緊湊的風格。而且 Lambda 表達式本身是可以重用的,非常方便。當需要為多核系統優(yōu)化時,可以 parallelStream().forEach(),只是此時原有元素的次序沒法保證,并行的情況下將改變串行時操作的行為,此時 forEach 本身的實現不需要調整,而 Java8 以前的 for 循環(huán) code 可能需要加入額外的多線程邏輯。 但一般認為,forEach 和常規(guī) for 循環(huán)的差異不涉及到性能,它們僅僅是函數式風格與傳統 Java 風格的差別。 另外一點需要注意,forEach 是 terminal 操作,因此它執(zhí)行后,Stream 的元素就被“消費”掉了,你無法對一個 Stream 進行兩次 terminal 運算。下面的代碼是錯誤的: stream.forEach(element -> doOneThing(element)); stream.forEach(element -> doAnotherThing(element)); 相反,具有相似功能的 intermediate 操作 peek 可以達到上述目的。如下是出現在該 api javadoc 上的一個示例。 清單 13. peek 對每個元素執(zhí)行操作并返回一個新的 StreamStream.of("one", "two", "three", "four") .filter(e -> e.length() > 3) .peek(e -> System.out.println("Filtered value: " + e)) .map(String::toUpperCase) .peek(e -> System.out.println("Mapped value: " + e)) .collect(Collectors.toList()); forEach 不能修改自己包含的本地變量值,也不能用 break/return 之類的關鍵字提前結束循環(huán)。 findFirst 這是一個 termimal 兼 short-circuiting 操作,它總是返回 Stream 的第一個元素,或者空。 這里比較重點的是它的返回值類型:Optional。這也是一個模仿 Scala 語言中的概念,作為一個容器,它可能含有某值,或者不包含。使用它的目的是盡可能避免 NullPointerException。 清單 14. Optional 的兩個用例String strA = " abcd ", strB = null; print(strA); print(""); print(strB); getLength(strA); getLength(""); getLength(strB); public static void print(String text) { // Java 8 Optional.ofNullable(text).ifPresent(System.out::println); // Pre-Java 8 if (text != null) { System.out.println(text); } } public static int getLength(String text) { // Java 8 return Optional.ofNullable(text).map(String::length).orElse(-1); // Pre-Java 8 // return if (text != null) ? text.length() : -1; }; 在更復雜的 if (xx != null) 的情況中,使用 Optional 代碼的可讀性更好,而且它提供的是編譯時檢查,能極大的降低 NPE 這種 Runtime Exception 對程序的影響,或者迫使程序員更早的在編碼階段處理空值問題,而不是留到運行時再發(fā)現和調試。 Stream 中的 findAny、max/min、reduce 等方法等返回 Optional 值。還有例如 IntStream.average() 返回 OptionalDouble 等等。 reduce 這個方法的主要作用是把 Stream 元素組合起來。它提供一個起始值(種子),然后依照運算規(guī)則(BinaryOperator),和前面 Stream 的第一個、第二個、第 n 個元素組合。從這個意義上說,字符串拼接、數值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相當于 Integer sum = integers.reduce(0, (a, b) -> a+b); 或 Integer sum = integers.reduce(0, Integer::sum); 也有沒有起始值的情況,這時會把 Stream 的前面兩個元素組合起來,返回的是 Optional。 清單 15. reduce 的用例// 字符串連接,concat = "ABCD" String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat); // 求最小值,minValue = -3.0 double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min); // 求和,sumValue = 10, 有起始值 int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum); // 求和,sumValue = 10, 無起始值 sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get(); // 過濾,字符串連接,concat = "ace" concat = Stream.of("a", "B", "c", "D", "e", "F"). filter(x -> x.compareTo("Z") > 0). reduce("", String::concat); 上面代碼例如第一個示例的 reduce(),第一個參數(空白字符)即為起始值,第二個參數(String::concat)為 BinaryOperator。這類有起始值的 reduce() 都返回具體的對象。而對于第四個示例沒有起始值的 reduce(),由于可能沒有足夠的元素,返回的是 Optional,請留意這個區(qū)別。 limit/skip limit 返回 Stream 的前面 n 個元素;skip 則是扔掉前 n 個元素(它是由一個叫 subStream 的方法改名而來)。 清單 16. limit 和 skip 對運行次數的影響public void testLimitAndSkip() { List<Person> persons = new ArrayList(); for (int i = 1; i <= 10000; i++) { Person person = new Person(i, "name" + i); persons.add(person); } List<String> personList2 = persons.stream(). map(Person::getName).limit(10).skip(3).collect(Collectors.toList()); System.out.println(personList2); } private class Person { public int no; private String name; public Person (int no, String name) { this.no = no; this.name = name; } public String getName() { System.out.println(name); return name; } } 輸出結果為: name1 name2 name3 name4 name5 name6 name7 name8 name9 name10 [name4, name5, name6, name7, name8, name9, name10] 這是一個有 10,000 個元素的 Stream,但在 short-circuiting 操作 limit 和 skip 的作用下,管道中 map 操作指定的 getName() 方法的執(zhí)行次數為 limit 所限定的 10 次,而最終返回結果在跳過前 3 個元素后只有后面 7 個返回。 有一種情況是 limit/skip 無法達到 short-circuiting 目的的,就是把它們放在 Stream 的排序操作后,原因跟 sorted 這個 intermediate 操作有關:此時系統并不知道 Stream 排序后的次序如何,所以 sorted 中的操作看上去就像完全沒有被 limit 或者 skip 一樣。 清單 17. limit 和 skip 對 sorted 后的運行次數無影響List<Person> persons = new ArrayList(); for (int i = 1; i <= 5; i++) { Person person = new Person(i, "name" + i); persons.add(person); } List<Person> personList2 = persons.stream().sorted((p1, p2) -> p1.getName().compareTo(p2.getName())).limit(2).collect(Collectors.toList()); System.out.println(personList2); 上面的示例對清單 13 做了微調,首先對 5 個元素的 Stream 排序,然后進行 limit 操作。輸出結果為: name2 name1 name3 name2 name4 name3 name5 name4 [stream.StreamDW$Person@816f27d, stream.StreamDW$Person@87aac27] 即雖然最后的返回元素數量是 2,但整個管道中的 sorted 表達式執(zhí)行次數沒有像前面例子相應減少。 最后有一點需要注意的是,對一個 parallel 的 Steam 管道來說,如果其元素是有序的,那么 limit 操作的成本會比較大,因為它的返回對象必須是前 n 個也有一樣次序的元素。取而代之的策略是取消元素間的次序,或者不要用 parallel Stream。 sorted 對 Stream 的排序通過 sorted 進行,它比數組的排序更強之處在于你可以首先對 Stream 進行各類 map、filter、limit、skip 甚至 distinct 來減少元素數量后,再排序,這能幫助程序明顯縮短執(zhí)行時間。我們對清單 14 進行優(yōu)化: 清單 18. 優(yōu)化:排序前進行 limit 和 skip結果會簡單很多: name2 name1 [stream.StreamDW$Person@6ce253f1, stream.StreamDW$Person@53d8d10a] 當然,這種優(yōu)化是有 business logic 上的局限性的:即不要求排序后再取值。 min/max/distinct min 和 max 的功能也可以通過對 Stream 元素先排序,再 findFirst 來實現,但前者的性能會更好,為 O(n),而 sorted 的成本是 O(n log n)。同時它們作為特殊的 reduce 方法被獨立出來也是因為求最大最小值是很常見的操作。 清單 19. 找出最長一行的長度BufferedReader br = new BufferedReader(new FileReader("c:\\SUService.log")); int longest = br.lines(). mapToInt(String::length). max(). getAsInt(); br.close(); System.out.println(longest); 下面的例子則使用 distinct 來找出不重復的單詞。 清單 20. 找出全文的單詞,轉小寫,并排序List<String> words = br.lines(). flatMap(line -> Stream.of(line.split(" "))). filter(word -> word.length() > 0). map(String::toLowerCase). distinct(). sorted(). collect(Collectors.toList()); br.close(); System.out.println(words); Match Stream 有三個 match 方法,從語義上說:
它們都不是要遍歷全部元素才能返回結果。例如 allMatch 只要一個元素不滿足條件,就 skip 剩下的所有元素,返回 false。對清單 13 中的 Person 類稍做修改,加入一個 age 屬性和 getAge 方法。 清單 21. 使用 MatchList<Person> persons = new ArrayList(); persons.add(new Person(1, "name" + 1, 10)); persons.add(new Person(2, "name" + 2, 21)); persons.add(new Person(3, "name" + 3, 34)); persons.add(new Person(4, "name" + 4, 6)); persons.add(new Person(5, "name" + 5, 55)); boolean isAllAdult = persons.stream(). allMatch(p -> p.getAge() > 18); System.out.println("All are adult? " + isAllAdult); boolean isThereAnyChild = persons.stream(). anyMatch(p -> p.getAge() < 12); System.out.println("Any child? " + isThereAnyChild); 輸出結果: All are adult? false Any child? true 進階:自己生成流Stream.generate 通過實現 Supplier 接口,你可以自己來控制流的生成。這種情形通常用于隨機數、常量的 Stream,或者需要前后元素間維持著某種狀態(tài)信息的 Stream。把 Supplier 實例傳遞給 Stream.generate() 生成的 Stream,默認是串行(相對 parallel 而言)但無序的(相對 ordered 而言)。由于它是無限的,在管道中,必須利用 limit 之類的操作限制 Stream 大小。 清單 22. 生成 10 個隨機整數Random seed = new Random(); Supplier<Integer> random = seed::nextInt; Stream.generate(random).limit(10).forEach(System.out::println); //Another way IntStream.generate(() -> (int) (System.nanoTime() % 100)). limit(10).forEach(System.out::println); Stream.generate() 還接受自己實現的 Supplier。例如在構造海量測試數據的時候,用某種自動的規(guī)則給每一個變量賦值;或者依據公式計算 Stream 的每個元素值。這些都是維持狀態(tài)信息的情形。 清單 23. 自實現 SupplierStream.generate(new PersonSupplier()). limit(10). forEach(p -> System.out.println(p.getName() + ", " + p.getAge())); private class PersonSupplier implements Supplier<Person> { private int index = 0; private Random random = new Random(); @Override public Person get() { return new Person(index++, "StormTestUser" + index, random.nextInt(100)); } } 輸出結果: StormTestUser1, 9 StormTestUser2, 12 StormTestUser3, 88 StormTestUser4, 51 StormTestUser5, 22 StormTestUser6, 28 StormTestUser7, 81 StormTestUser8, 51 StormTestUser9, 4 StormTestUser10, 76 Stream.iterate iterate 跟 reduce 操作很像,接受一個種子值,和一個 UnaryOperator(例如 f)。然后種子值成為 Stream 的第一個元素,f(seed) 為第二個,f(f(seed)) 第三個,以此類推。 清單 24. 生成一個等差數列 Stream.iterate(0, n -> n + 3).limit(10). forEach(x -> System.out.print(x + " "));. 輸出結果: 0 3 6 9 12 15 18 21 24 27 與 Stream.generate 相仿,在 iterate 時候管道必須有 limit 這樣的操作來限制 Stream 大小。 進階:用 Collectors 來進行 reduction 操作java.util.stream.Collectors 類的主要作用就是輔助進行各類有用的 reduction 操作,例如轉變輸出為 Collection,把 Stream 元素進行歸組。 groupingBy/partitioningBy 清單 25. 按照年齡歸組Map<Integer, List<Person>> personGroups = Stream.generate(new PersonSupplier()). limit(100). collect(Collectors.groupingBy(Person::getAge)); Iterator it = personGroups.entrySet().iterator(); while (it.hasNext()) { Map.Entry<Integer, List<Person>> persons = (Map.Entry) it.next(); System.out.println("Age " + persons.getKey() + " = " + persons.getValue().size()); } 上面的 code,首先生成 100 人的信息,然后按照年齡歸組,相同年齡的人放到同一個 list 中,可以看到如下的輸出: Age 0 = 2 Age 1 = 2 Age 5 = 2 Age 8 = 1 Age 9 = 1 Age 11 = 2 …… 清單 26. 按照未成年人和成年人歸組Map<Boolean, List<Person>> children = Stream.generate(new PersonSupplier()). limit(100). collect(Collectors.partitioningBy(p -> p.getAge() < 18)); System.out.println("Children number: " + children.get(true).size()); System.out.println("Adult number: " + children.get(false).size()); 輸出結果: Children number: 23 Adult number: 77 在使用條件“年齡小于 18”進行分組后可以看到,不到 18 歲的未成年人是一組,成年人是另外一組。partitioningBy 其實是一種特殊的 groupingBy,它依照條件測試的是否兩種結果來構造返回的數據結構,get(true) 和 get(false) 能即為全部的元素對象。 結束語總之,Stream 的特性可以歸納為:
|
|
來自: land_zhj > 《jdk8 lambda》