From b56b77daaf9b6bc2e38fbbeca41a8c2ec87ba52e Mon Sep 17 00:00:00 2001 From: Ryan Skraba Date: Sat, 16 Jan 2021 18:33:17 +0100 Subject: [PATCH] feat: Initial commit --- .github/workflows/maven-publish.yml | 22 + .gitignore | 10 + .scalafmt.conf | 1 + LICENSE | 201 +++++++++ core/pom.xml | 30 ++ core/readme.md | 10 + .../beam/enchiridion/core/BloomFilters.java | 126 ++++++ .../core/metrics/CountMetricsDoFn.java | 74 ++++ .../core/metrics/GaugeMetricsDoFn.java | 91 ++++ .../core/pi/MeasureDistanceDoFn.java | 14 + .../core/pi/MonteCarloPiTransform.java | 66 +++ .../beam/enchiridion/core/pi/ThrowDart.java | 13 + .../beam/enchiridion/core/BasicTest.java | 30 ++ .../enchiridion/core/BloomFiltersTest.java | 53 +++ .../core/pi/MeasureDistanceDoFnTest.java | 27 ++ .../core/pi/MonteCarloPiTransformTest.java | 47 ++ pom.xml | 418 ++++++++++++++++++ readme.md | 53 +++ 18 files changed, 1286 insertions(+) create mode 100644 .github/workflows/maven-publish.yml create mode 100644 .gitignore create mode 100644 .scalafmt.conf create mode 100644 LICENSE create mode 100644 core/pom.xml create mode 100644 core/readme.md create mode 100644 core/src/main/java/com/skraba/beam/enchiridion/core/BloomFilters.java create mode 100644 core/src/main/java/com/skraba/beam/enchiridion/core/metrics/CountMetricsDoFn.java create mode 100644 core/src/main/java/com/skraba/beam/enchiridion/core/metrics/GaugeMetricsDoFn.java create mode 100644 core/src/main/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFn.java create mode 100644 core/src/main/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransform.java create mode 100644 core/src/main/java/com/skraba/beam/enchiridion/core/pi/ThrowDart.java create mode 100644 core/src/test/java/com/skraba/beam/enchiridion/core/BasicTest.java create mode 100644 core/src/test/java/com/skraba/beam/enchiridion/core/BloomFiltersTest.java create mode 100644 core/src/test/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFnTest.java create mode 100644 core/src/test/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransformTest.java create mode 100644 pom.xml create mode 100644 readme.md diff --git a/.github/workflows/maven-publish.yml b/.github/workflows/maven-publish.yml new file mode 100644 index 0000000..54f7753 --- /dev/null +++ b/.github/workflows/maven-publish.yml @@ -0,0 +1,22 @@ +name: Java CI + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 11 + uses: actions/setup-java@v1 + with: + java-version: 11 + - name: Cache Maven packages + uses: actions/cache@v2 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2 + - name: Build with Maven + run: mvn -B package --file pom.xml -P apache-snapshots diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..032da83 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +**/*.iml +**/.idea/ +**/target/ +**/.settings/ +**/.project +**/.classpath +**/.checkstyle +**/.gradle +**/build +**/dependency-reduced-pom.xml diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..010fa6d --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1 @@ +version = "2.7.5" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/core/pom.xml b/core/pom.xml new file mode 100644 index 0000000..fd47ba7 --- /dev/null +++ b/core/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + + + com.skraba.beam.enchiridion + beam-enchiridion + 0.0.1-SNAPSHOT + + + beam-enchiridion-core + jar + + Beam Enchiridion :: Core + Beam core examples without any runner dependencies. + + + false + + + + + org.apache.beam + beam-sdks-java-core + + + + diff --git a/core/readme.md b/core/readme.md new file mode 100644 index 0000000..7874b89 --- /dev/null +++ b/core/readme.md @@ -0,0 +1,10 @@ +The Enchiridion: Beam Core +========================== + +Examples for the beam core Java SDK. + +The `pom.xml` relies on `beam-sdks-java-core` but does not have any of the necessary runner +libraries that are necessary to execute the jobs in any given framework. + +This illustrates how to create and test runner-independent Beam model objects like PTransform, +DoFn, side inputs, etc \ No newline at end of file diff --git a/core/src/main/java/com/skraba/beam/enchiridion/core/BloomFilters.java b/core/src/main/java/com/skraba/beam/enchiridion/core/BloomFilters.java new file mode 100644 index 0000000..c0d1e42 --- /dev/null +++ b/core/src/main/java/com/skraba/beam/enchiridion/core/BloomFilters.java @@ -0,0 +1,126 @@ +package com.skraba.beam.enchiridion.core; + +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.BloomFilter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Funnel; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Funnels; + +/** + * Implements transformations for creating and filtering on Bloom filters. + * + *

This uses the vendored guava {@link BloomFilter} packaged with Beam. + */ +public class BloomFilters { + + /** + * @param funnel A consumer for the elements. + * @param expectedSize The expected number of elements in the collection. + * @param fpp The approximate false positive probability desired. + * @return A transformation that creates a singleton view of a BloomFilter with all the elements + * inside. + */ + public static PTransform, PCollectionView>> of( + Funnel funnel, long expectedSize, double fpp) { + return new PTransform<>() { + + @Override + public PCollectionView> expand(PCollection input) { + PCollection> in = + input + .apply(Combine.globally(new BloomFilterCreateFn(funnel, expectedSize, fpp))) + .setCoder(SerializableCoder.of((Class>) (Class) BloomFilter.class)); + return in.apply(View.asSingleton()); + } + }; + } + + /** + * Create a bloom filter over a String collection. + * + * @param expectedSize The expected number of elements in the collection. + * @param fpp The approximate false positive probability desired. + * @return A transformation that creates a singleton view of a BloomFilter with all the elements + * inside. + */ + public static PTransform, PCollectionView>> ofStrings( + long expectedSize, double fpp) { + return of(Funnels.stringFunnel(Charsets.UTF_8), expectedSize, fpp); + } + + /** + * Applies a filter to a collection to remove elements that are definitely not in the filter. + * + *

That is, if an element is excluded by this transformation, it is guaranteed to not have been + * in the collection that created the Bloom filter. If it is included, it was probably (but not + * guaranteed) to have been added to the Bloom filter. + * + * @param bloomFilter The Bloom filter initialized for a filter collection. + * @return A subset of the input where elements were probably also added to the filter collection. + */ + public static PTransform, PCollection> filter( + final PCollectionView> bloomFilter) { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + if (c.sideInput(bloomFilter).mightContain(c.element())) + c.output(c.element()); + } + }) + .withSideInputs(bloomFilter)); + } + }; + } + + /** Create and aggregate a {@link BloomFilter}. */ + private static class BloomFilterCreateFn + extends Combine.CombineFn, BloomFilter> { + private final long expectedSize; + private final double fpp; + private final Funnel funnel; + + public BloomFilterCreateFn(Funnel funnel, long expectedSize, double fpp) { + this.funnel = funnel; + this.expectedSize = expectedSize; + this.fpp = fpp; + } + + @Override + public BloomFilter createAccumulator() { + return BloomFilter.create(funnel, expectedSize, fpp); + } + + @Override + public BloomFilter addInput(BloomFilter mutableAccumulator, T input) { + mutableAccumulator.put(input); + return mutableAccumulator; + } + + @Override + public BloomFilter mergeAccumulators(Iterable> accumulators) { + BloomFilter merged = null; + for (BloomFilter b : accumulators) { + if (merged == null) merged = b; + else merged.putAll(b); + } + return merged; + } + + @Override + public BloomFilter extractOutput(BloomFilter accumulator) { + return accumulator; + } + } +} diff --git a/core/src/main/java/com/skraba/beam/enchiridion/core/metrics/CountMetricsDoFn.java b/core/src/main/java/com/skraba/beam/enchiridion/core/metrics/CountMetricsDoFn.java new file mode 100644 index 0000000..3cd410e --- /dev/null +++ b/core/src/main/java/com/skraba/beam/enchiridion/core/metrics/CountMetricsDoFn.java @@ -0,0 +1,74 @@ +package com.skraba.beam.enchiridion.core.metrics; + +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Passes through all elements, but counts them with Beam metrics. + * + *

The count can be extracted from a {@link PipelineResult} with just the name. + * + * @param Ignored type, passed through. + */ +public class CountMetricsDoFn extends DoFn { + + public static final String DEFAULT_NAME = "count"; + + private final String name; + + private Counter counter; + + private CountMetricsDoFn(String name) { + this.name = name; + this.counter = Metrics.counter(CountMetricsDoFn.class, name); + } + + /** @return a metrics-counting pass-through function with the default name. */ + public static CountMetricsDoFn of() { + return new CountMetricsDoFn(DEFAULT_NAME); + } + + /** @return a metrics-counting pass-through function with the given name. */ + public static CountMetricsDoFn of(String name) { + return new CountMetricsDoFn<>(name); + } + + /** @return an identifying name for this metric */ + public String getName() { + return name; + } + + /** Pass through the data without modification. */ + @ProcessElement + public void processElement(ProcessContext c) { + counter.inc(); + c.output(c.element()); + } + + /** + * Count all of the rows that all instances of this DoFn have encountered. + * + * @param result The results of the pipeline, containing the metrics. + * @return The sum total of all metrics counted by this class. + */ + public static long getCount(PipelineResult result, String name) { + MetricQueryResults metrics = + result + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(CountMetricsDoFn.class, name)) + .build()); + long count = 0; + for (MetricResult c : metrics.getCounters()) { + count += c.getAttempted(); + } + return count; + } +} diff --git a/core/src/main/java/com/skraba/beam/enchiridion/core/metrics/GaugeMetricsDoFn.java b/core/src/main/java/com/skraba/beam/enchiridion/core/metrics/GaugeMetricsDoFn.java new file mode 100644 index 0000000..a1976d6 --- /dev/null +++ b/core/src/main/java/com/skraba/beam/enchiridion/core/metrics/GaugeMetricsDoFn.java @@ -0,0 +1,91 @@ +package com.skraba.beam.enchiridion.core.metrics; + +import java.util.Objects; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** + * Passes through all elements, but retains the last value in a gauge as a long. + * + *

The element is converted into a long via the {@link SerializableFunction} parameter. This is + * the value that is stored in the metrics system and returned via the pipeline results. + * + *

The value can be extracted from a {@link PipelineResult} with just the name. + * + * @param Ignored type, passed through. + */ +public class GaugeMetricsDoFn extends DoFn { + + public static final String DEFAULT_NAME = "count"; + + private final String name; + + private final SerializableFunction converter; + + private final Gauge gauge; + + private GaugeMetricsDoFn(String name, SerializableFunction converter) { + this.name = name; + this.converter = converter; + this.gauge = Metrics.gauge(GaugeMetricsDoFn.class, name); + } + + /** @return a metrics pass-through function with the default name. */ + public static GaugeMetricsDoFn of() { + return new GaugeMetricsDoFn<>(DEFAULT_NAME, GaugeMetricsDoFn::convertToLong); + } + + /** @return a pass-through function with the given name. */ + public static GaugeMetricsDoFn of(String name, SerializableFunction converter) { + return new GaugeMetricsDoFn<>(name, converter); + } + + /** @return a pass-through function with the given name, and using the conversion function. */ + public static GaugeMetricsDoFn of(String name) { + return new GaugeMetricsDoFn<>(name, GaugeMetricsDoFn::convertToLong); + } + + public static Long convertToLong(Object element) { + if (element instanceof Long) return (Long) element; + return 0L; + } + + /** @return an identifying name for this metric */ + public String getName() { + return name; + } + + /** Pass through the data without modification. */ + @ProcessElement + public void processElement(ProcessContext c) { + Long gaugeValue = converter.apply(c.element()); + if (gaugeValue != null) gauge.set(gaugeValue); + c.output(c.element()); + } + + /** + * @param result The results of the pipeline, containing the metrics. + * @return The result associated with the gauge of the given name. + */ + public static long getValue(PipelineResult result, String name) { + MetricQueryResults metrics = + result + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(GaugeMetricsDoFn.class, name)) + .build()); + for (MetricResult c : metrics.getGauges()) { + return Objects.requireNonNull(c.getAttempted()).getValue(); + } + return 0; + } +} diff --git a/core/src/main/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFn.java b/core/src/main/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFn.java new file mode 100644 index 0000000..4cb25c3 --- /dev/null +++ b/core/src/main/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFn.java @@ -0,0 +1,14 @@ +package com.skraba.beam.enchiridion.core.pi; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +/** For every incoming point, return its distance squared from the origin. */ +public class MeasureDistanceDoFn extends DoFn, Double> { + + @ProcessElement + public void processElement(ProcessContext context) { + KV point = context.element(); + context.output(point.getKey() * point.getKey() + point.getValue() * point.getValue()); + } +} diff --git a/core/src/main/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransform.java b/core/src/main/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransform.java new file mode 100644 index 0000000..cccaa3c --- /dev/null +++ b/core/src/main/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransform.java @@ -0,0 +1,66 @@ +package com.skraba.beam.enchiridion.core.pi; + +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Use the Monte Carlo method to calculate PI. + * + *

This is a neat, but inefficient way to calculate PI in a distributed system. It takes a + * collection of input elements. Each input element is ignored and replaced by a random number, used + * in the calculation. The output is the value of Pi from the analysed random inputs. + * + *

A circle with a radius of 1 unit has an area of exactly PI units squared. If the centre is the + * origin, one quarter of the circle will overlap with the positive unit square. + * + *

In other words if you throw a dart randomly in the unit square, you have PI/4 chance that it + * lies within the circle (distance from the origin <= 1 unit). By throwing more and more darts and + * checking the distance of each from the origin, you can get a more and more precise calculation of + * PI. + * + *

Unlike other methods, you have to generate an order of magnitude more dart throws for each + * additional decimal point of precision. + */ +public class MonteCarloPiTransform extends PTransform, PCollection> { + + @Override + public PCollection expand(PCollection input) { + // Count the number of incoming records in total. + final PCollectionView count = + input.apply(Combine.globally(Count.combineFn()).withoutDefaults().asSingletonView()); + + // Turn each record into a dart throw. + PCollection> points = + input.apply("RewriteInputs", MapElements.via(new ThrowDart())); + // Calculate the distance (squared) from the origin of each throw. + PCollection distance = + points.apply("MeasureDistance", ParDo.of(new MeasureDistanceDoFn())); + // Throw away all records that are outside of the circle. + PCollection inCircle = distance.apply("FilterInCircle", Filter.lessThanEq(1.0d)); + + // Count the number of darts inside the circle. + PCollection inCircleCount = inCircle.apply(Count.globally()); + + // Do the division in a PTransform. + return inCircleCount.apply( + "MonteCarlePi", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + // The element is the number of darts in the circle. + // The side input is the total number of darts thrown. + c.output(4.0d * c.element() / c.sideInput(count)); + } + }) + .withSideInputs(count)); + } +} diff --git a/core/src/main/java/com/skraba/beam/enchiridion/core/pi/ThrowDart.java b/core/src/main/java/com/skraba/beam/enchiridion/core/pi/ThrowDart.java new file mode 100644 index 0000000..f18cc9d --- /dev/null +++ b/core/src/main/java/com/skraba/beam/enchiridion/core/pi/ThrowDart.java @@ -0,0 +1,13 @@ +package com.skraba.beam.enchiridion.core.pi; + +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; + +/** For every incoming record,throw away its data and return a random point in the unit square. */ +public class ThrowDart extends SimpleFunction> { + + @Override + public KV apply(Long input) { + return KV.of(Math.random(), Math.random()); + } +} diff --git a/core/src/test/java/com/skraba/beam/enchiridion/core/BasicTest.java b/core/src/test/java/com/skraba/beam/enchiridion/core/BasicTest.java new file mode 100644 index 0000000..c32344e --- /dev/null +++ b/core/src/test/java/com/skraba/beam/enchiridion/core/BasicTest.java @@ -0,0 +1,30 @@ +package com.skraba.beam.enchiridion.core; + +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +/** Simple unit tests using Beam. */ +public class BasicTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testCountOInString() { + PCollection in = + pipeline.apply( + "In", + Create.of( + "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten")); + + PAssert.that(in.apply("AssertCount", Count.globally())).containsInAnyOrder(10L); + PCollection withO = in.apply(Filter.by(s -> s.contains("o"))); + PAssert.that(withO.apply("AssertWithO", Count.globally())).containsInAnyOrder(3L); + + pipeline.run(); + } +} diff --git a/core/src/test/java/com/skraba/beam/enchiridion/core/BloomFiltersTest.java b/core/src/test/java/com/skraba/beam/enchiridion/core/BloomFiltersTest.java new file mode 100644 index 0000000..ba2d06f --- /dev/null +++ b/core/src/test/java/com/skraba/beam/enchiridion/core/BloomFiltersTest.java @@ -0,0 +1,53 @@ +package com.skraba.beam.enchiridion.core; + +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.BloomFilter; +import org.junit.Rule; +import org.junit.Test; + +/** Unit tests for {@link BloomFilters}. */ +public class BloomFiltersTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testBasic() { + + // Create a filter with "0" to "999" + PCollectionView> filter = + pipeline + .apply("FilterContentsGenerate", GenerateSequence.from(0).to(1000)) + .apply( + "FilterContentsToString", + MapElements.into(TypeDescriptors.strings()).via((String::valueOf))) + .apply("ToBloom", BloomFilters.ofStrings(1000, 0.01)); + + // The input dataset has "500" to "1499" + PCollection in = + pipeline + .apply("InputGenerate", GenerateSequence.from(500).to(1500)) + .apply( + "InputToString", + MapElements.into(TypeDescriptors.strings()).via((String::valueOf))); + + // Use the Bloom filter on the input dataset. There should only be 500 exact matches, and ~1% + // of 500 (~5) false positives. + PCollection filtered = in.apply(BloomFilters.filter(filter)); + + // In fact, there are exactly 6 false positives. + PAssert.that(filtered.apply("AssertFilteredCount", Count.globally())).containsInAnyOrder(506L); + PCollection falsePositives = filtered.apply(Filter.by(s -> s.length() > 3)); + PAssert.that(falsePositives.apply("AssertFalsePositive", Count.globally())) + .containsInAnyOrder(6L); + PAssert.that(falsePositives).containsInAnyOrder("1056", "1137", "1220", "1263", "1269", "1392"); + + pipeline.run(); + } +} diff --git a/core/src/test/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFnTest.java b/core/src/test/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFnTest.java new file mode 100644 index 0000000..d3b4b1e --- /dev/null +++ b/core/src/test/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFnTest.java @@ -0,0 +1,27 @@ +package com.skraba.beam.enchiridion.core.pi; + +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +/** Unit tests for {@link MeasureDistanceDoFn}. */ +public class MeasureDistanceDoFnTest { + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testBasic() { + PCollection in = + pipeline + .apply(Create.of(KV.of(0d, 0d), KV.of(0.5d, 0.5d), KV.of(1d, 1d), KV.of(3d, 4d))) + .apply("measure", ParDo.of(new MeasureDistanceDoFn())); + + PAssert.that(in).containsInAnyOrder(0d, 0.5d, 2d, 25d); + pipeline.run(); + } +} diff --git a/core/src/test/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransformTest.java b/core/src/test/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransformTest.java new file mode 100644 index 0000000..9419a1d --- /dev/null +++ b/core/src/test/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransformTest.java @@ -0,0 +1,47 @@ +package com.skraba.beam.enchiridion.core.pi; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.closeTo; + +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +/** Unit tests for {@link MonteCarloPiTransform}. */ +public class MonteCarloPiTransformTest { + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testBasic() { + PCollection pi = + pipeline.apply(GenerateSequence.from(0).to(1000)).apply(new MonteCarloPiTransform()); + + // An assertion on a transform. + PAssert.that(pi.apply(Count.globally())).containsInAnyOrder(1L); + + // An assertion on every element in a collection. + PAssert.that(pi) + .satisfies( + elements -> { + for (Double approximation : elements) { + assertThat(approximation, closeTo(Math.PI, 0.2)); + } + return null; + }); + + // An assertion on the single element. + PAssert.thatSingleton(pi) + .satisfies( + element -> { + assertThat(element, closeTo(Math.PI, 0.2)); + return null; + }); + + pipeline.run(); + } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..67fa314 --- /dev/null +++ b/pom.xml @@ -0,0 +1,418 @@ + + 4.0.0 + + com.skraba.beam.enchiridion + beam-enchiridion + 0.0.1-SNAPSHOT + + Beam Enchiridion :: Parent + pom + + + + true + + UTF-8 + + 11 + 11 + + + 2.27.0 + 3.1.1 + 2.2 + 4.13.1 + 2.9.2 + 2.12 + 2.12.13 + 2.7.5 + 1.25.1 + 3.2.2 + 1.7.30 + + + 3.0.0-M3 + 3.0.0-M5 + 3.2.0 + 4.4.0 + 2.0.2 + 3.2.4 + 2.7.0 + 3.0.0-M5 + + + + + + local-snapshots + + + local.snapshots + Local Snapshot Repository + file:///tmp//snapshots/ + + false + + + true + + + + + + + + apache-snapshots + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + apache-staging + + + apache.staging + Apache Development Staging Repository + https://repository.apache.org/content/repositories/staging/ + + true + + + + + + apache.staging + Apache Development Staging Repository + https://repository.apache.org/content/repositories/staging/ + + + + + + + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + ${plugin.enforcer.maven.version} + + + enforce-maven + + enforce + + + + + 3.5 + + + + + + + + + + maven-jar-plugin + 3.2.0 + + + + ${exec.mainClass} + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${plugin.shade.maven.version} + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + package + + shade + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${plugin.surefire.maven.version} + + src/test/java + src/test/scala + + **/*Spec.java + **/*Test.java + + + + + maven-failsafe-plugin + ${plugin.failsafe.maven.version} + + + + + net.alchim31.maven + scala-maven-plugin + ${plugin.maven.scala.version} + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + + com.diffplug.spotless + spotless-maven-plugin + ${plugin.spotless.version} + + + + + + + + src/main/scala/**/*.scala + src/test/scala/**/*.scala + + + ${scalafmt.version} + + + + + + + + org.scalatest + scalatest-maven-plugin + ${plugin.maven.scalatest.version} + + + test + + test + + + + + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + net.alchim31.maven + scala-maven-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.scalatest + scalatest-maven-plugin + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + + org.apache.beam + beam-runners-spark + ${beam.version} + + + org.apache.beam + beam-runners-direct-java + ${beam.version} + + + org.apache.beam + beam-sdks-java-core + ${beam.version} + + + org.apache.beam + beam-sdks-java-nexmark + ${beam.version} + + + org.apache.beam + beam-sdks-java-io-parquet + ${beam.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.scala-lang + scala-library + ${scala.version} + + + com.typesafe.play + play-json_${scala.minor.version} + ${play.json.version} + + + + + junit + junit + ${junit4.version} + test + + + org.hamcrest + hamcrest-core + ${hamcrest.version} + test + + + org.hamcrest + hamcrest-library + ${hamcrest.version} + test + + + org.scalatest + scalatest_${scala.minor.version} + ${scalatest.version} + test + + + org.slf4j + slf4j-simple + ${slf4j.version} + test + + + + + + + + junit + junit + test + + + + org.apache.beam + beam-runners-direct-java + test + + + org.hamcrest + hamcrest-core + test + + + org.hamcrest + hamcrest-library + test + + + org.scalatest + scalatest_${scala.minor.version} + test + + + + + core + + + diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..e9da02b --- /dev/null +++ b/readme.md @@ -0,0 +1,53 @@ +The Beam Enchiridion +==================== + +![Java CI](https://github.com/RyanSkraba/beam-enchiridion/workflows/Java%20CI/badge.svg) + +_[**Enchiridion**](https://en.wikipedia.org/wiki/Enchiridion): **A small manual or handbook.**_ It's a bit like a tech [cook book](https://www.oreilly.com/search/?query=cookbook), but a bigger, fancier, SEO-optimizabler word. + + + +This project describes how to do many common tasks using [Beam](https://beam.apache.org). + +Topics +------ + +| I want to... | See... | +| ------------- | ------------- | +| write a simple ParDo | [ThrowDart](core/src/main/java/com/skraba/beam/enchiridion/core/pi/ThrowDart.java), [MeasureDistanceDoFn](core/src/main/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFn.java) ([_test_](core/src/test/java/com/skraba/beam/enchiridion/core/pi/MeasureDistanceDoFnTest.java))| +| write a composite PTransform | [MonteCarloPiTransform](core/src/main/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransform.java) ([_test_](core/src/test/java/com/skraba/beam/enchiridion/core/pi/MonteCarloPiTransformTest.java))| +| write a source | | +| write a sdf | | +| side input | | +| state | | +| metrics | | +| streaming | | +| my own windowing function | | + +Modules +------- + +| module | description | +| ------------- | ------------- | +| [core](core/readme.md) | Examples for the beam core Java SDK. | + + +[scio]: https://github.com/spotify/scio + + +Running with a locally-built SNAPSHOT +------------------------------------- + +```bash +# Build beam artifacts and publish to /tmp/snapshots/ +cd beam +./gradlew -Ppublishing -PdistMgmtSnapshotsUrl=/tmp/snapshots/ publishToMavenLocal + +# Build this project against the snapshots. +cd beam-enchiridion +mvn -Dbeam.version=2.x.0-SNAPSHOT -Plocal-snapshot clean install +``` + +