Euphoria Java 8 DSL
What is Euphoria
Easy to use Java 8 API build on top of the Beam’s Java SDK. API provides a high-level abstraction of data transformations, with focus on the Java 8 language features (e.g. lambdas and streams). It is fully inter-operable with existing Beam SDK and convertible back and forth. It allows fast prototyping through use of (optional) Kryo based coders, lambdas and high level operators and can be seamlessly integrated into existing Beam Pipelines
.
Euphoria API project has been started in 2014, with a clear goal of providing the main building block for Seznam.cz’s data infrastructure. In 2015, DataFlow whitepaper inspired original authors to go one step further and also provide the unified API for both stream and batch processing. The API has been open-sourced in 2016 and is still in active development. As the Beam’s community goal was very similar, we decided to contribute the API as a high level DSL over Beam Java SDK and share our effort with the community.
Euphoria DSL integration is still work in progress and is tracked as part of BEAM-3900.
WordCount Example
Lets start with the small example.
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// Use Kryo as coder fallback
KryoCoderProvider.of().registerTo(pipeline);
// Source of data loaded from Beam IO.
PCollection<String> input =
pipeline
.apply(Create.of(textLineByLine))
.setTypeDescriptor(TypeDescriptor.of(String.class));
// zero, one, or more output elements. From input lines we will get data set of words.
PCollection<String> words =
FlatMap.named("TOKENIZER")
.of(lines)
.using(
(String line, Collector<String> context) -> {
for (String word : Splitter.onPattern("\\s+").split(line)) {
context.collect(word);
}
})
.output();
// Now we can count input words - the operator ensures that all values for the same
// key (word in this case) end up being processed together. Then it counts number of appearances
// of the same key in 'words' PCollection and emits it to output.
PCollection<KV<String, Long>> counted =
CountByKey.named("COUNT")
.of(words)
.keyBy(w -> w)
.output();
// Format output.
PCollection<String> output =
MapElements.named("FORMAT")
.of(counted)
.using(p -> p.getKey() + ": " + p.getValue())
.output();
// Now we can again use Beam transformation. In this case we save words and their count
// into the text file.
output
.apply(TextIO.write()
.to("counted_words"));
pipeline.run();
Euphoria Guide
Euphoria API is composed from a set of operators, which allows you to construct Pipeline
according to your application needs.
Inputs and Outputs
Input data can be supplied through Beams IO into PCollection
, the same way as in Beam.
Adding Operators
Real power of Euphoria API is in its operators suite. Each Operator consumes one or more input and produces one output
PCollection
. Lets take a look at simple MapElements
example.
input
, it applies given lambda expression (String::valueOf
) on each element of input
and returns mapped PCollection
. Developer is guided through series of steps when creating operator so the declaration of an operator is straightforward. To start building operator just wrote its name and ‘.’ (dot). Your IDE will give you hints.First step to build any operator is to give it a name through named()
method. The name is propagated through system and can latter be used when debugging.
Coders and Types
Beam’s Java SDK requires developers to supply Coder
for custom element type in order to have a way of materializing elements. Euphoria allows to use Kryo as a way of serialization. The Kryo is located in :sdks:java:extensions:kryo
module.
//gradle
dependencies {
compile "org.apache.beam:sdks:java:extensions:kryo:${beam.version}"
}
//maven
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-kryo</artifactId>
<version>${beam.version}</version>
</dependency>
All you need is to create KryoCoderProvider
and register it to your
Pipeline
. There are two ways of doing that.
When prototyping you may decide not to care much about coders, then create KryoCoderProvider
without any class registrations to Kryo.
KryoCoderProvider
will return KryoCoder
for every non-primitive element type. That of course degrades performance, since Kryo is not able to serialize instance of unknown types effectively. But it boost speed of pipeline development. This behavior is enabled by default and can be disabled when creating Pipeline
through KryoOptions
.Second more performance friendly way is to register all the types which will Kryo serialize. Sometimes it is also a good idea to register Kryo serializers of its own too. Euphoria allows you to do that by implementing your own KryoRegistrar
and using it when creating KryoCoderProvider
.
TypeDescriptor
every time new type is introduced during Operator construction.TypeDescriptor<Object>
, when TypeDescriptors
is not supplied by user. So KryoCoderProvider
may return KryoCoder<Object>
for every element with unknown type, if allowed by KryoOptions
. Supplying TypeDescriptors
becomes mandatory when using .setKryoRegistrationRequired(true)
.Metrics and Accumulators
Statistics about job’s internals are very helpful during development of distributed jobs. Euphoria calls them accumulators. They are accessible through environment Context
, which can be obtained from Collector
, whenever working with it. It is usually present when zero-to-many output elements are expected from operator. For example in case of FlatMap
.
MapElements
also allows for Context
to be accessed by supplying implementations of UnaryFunctionEnv
(add second context argument) instead of UnaryFunctor
.Windowing
Euphoria follows the same windowing principles as Beam Java SDK. Every shuffle operator (operator which needs to shuffle data over the network) allows you to set it. The same parameters as in Beam are required. WindowFn
, Trigger
, WindowingStrategy
and other. Users are guided to either set all mandatory and several optional parameters or none when building an operator. Windowing is propagated down through the Pipeline
.
PCollection<KV<Integer, Long>> countedElements =
CountByKey.of(input)
.keyBy(e -> e)
.windowBy(FixedWindows.of(Duration.standardSeconds(1)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(5))
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.output();
How to get Euphoria
Euphoria is located in dsl-euphoria
branch, beam-sdks-java-extensions-euphoria
module of The Apache Beam project. To build euphoria
subproject call:
./gradlew beam-sdks-java-extensions-euphoria:build
Operator Reference
Operators are basically higher level data transformations, which allows you to build business logic of your data processing job in a simple way. All the Euphoria operators are documented in this section including examples. There are no examples with windowing applied for the sake of simplicity. Refer to the windowing section for more details.
CountByKey
Counting elements with the same key. Requires input dataset to be mapped by given key extractor (UnaryFunction
) to keys which are then counted. Output is emitted as KV<K, Long>
(K
is key type) where each KV
contains key and number of element in input dataset for the key.
Distinct
Outputting distinct (based on equals method) elements. It takes optional UnaryFunction
mapper parameter which maps elements to output type.
Distinct
with mapper.Join
Represents inner join of two (left and right) datasets on given key producing a new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as LeftT
and RightT
. The join itself is performed by user-supplied BinaryFunctor
which consumes elements from both dataset sharing the same key. And outputs result of the join (OutputT
). The operator emits output dataset of KV<K, OutputT>
type.
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
Join.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
Represents left join of two (left and right) datasets on given key producing single new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as LeftT
and RightT
. The join itself is performed by user-supplied BinaryFunctor
which consumes one element from both dataset, where right is present optionally, sharing the same key. And outputs result of the join (OutputT
). The operator emits output dataset of KV<K, OutputT>
type.
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
LeftJoin.named("left-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Integer l, Optional<String> r, Collector<String> c) ->
c.collect(l + "+" + r.orElse(null)))
.output();
// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
. Broadcast join can be very efficient when joining two datasets where one fits in memory (in LeftJoin
right dataset has to fit in memory). How to use ‘Broadcast Hash Join’ is described in Translation section.RightJoin
Represents right join of two (left and right) datasets on given key producing single new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as LeftT
and RightT
. The join itself is performed by user-supplied BinaryFunctor
which consumes one element from both dataset, where left is present optionally, sharing the same key. And outputs result of the join (OutputT
). The operator emits output dataset of KV<K, OutputT>
type.
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
RightJoin.named("right-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, String r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
// KV(8, "null+elephant"), KV(5, "null+mouse")]
RightJoin
. Broadcast join can be very efficient when joining two datasets where one fits in memory (in RightJoin
left dataset has to fit in memory). How to use ‘Broadcast Hash Join’ is described in Translation section.FullJoin
Represents full outer join of two (left and right) datasets on given key producing single new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as LeftT
and RightT
. The join itself is performed by user-supplied BinaryFunctor
which consumes one element from both dataset, where both are present only optionally, sharing the same key. And outputs result of the join (OutputT
). The operator emits output dataset of KV<K, OutputT>
type.
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
FullJoin.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, Optional<String> r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r.orElse(null)))
.output();
// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
// KV(1, "null+elephant"), KV(5, "null+mouse")]
MapElements
Transforms one input element of input type InputT
to one output element of another (potentially the same) OutputT
type. Transformation is done through user specified UnaryFunction
.
FlatMap
Transforms one input element of input type InputT
to zero or more output elements of another (potentially the same) OutputT
type. Transformation is done through user specified UnaryFunctor
, where Collector<OutputT>
is utilized to emit output elements. Notice similarity with MapElements
which can always emit only one element.
// suppose words contain: ["Brown", "fox", ".", ""]
PCollection<String> letters =
FlatMap.named("str2char")
.of(words)
.using(
(String s, Collector<String> collector) -> {
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
collector.collect(String.valueOf(c));
}
})
.output();
// characters will contain: ["B", "r", "o", "w", "n", "f", "o", "x", "."]
FlatMap
may be used to determine time-stamp of elements. It is done by supplying implementation of ExtractEventTime
time extractor when building it. There is specialized AssignEventTime
operator to assign time-stamp to elements. Consider using it, you code may be more readable.// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
FlatMap.named("extract-event-time")
.of(events)
.using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
.eventTimeBy(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
Filter
Filter
throws away all the elements which do not pass given condition. The condition is supplied by the user as implementation of UnaryPredicate
. Input and output elements are of the same type.
ReduceByKey
Performs aggregation of InputT
type elements with the same key through user-supplied reduce function. Key is extracted from each element through UnaryFunction
which takes input element and outputs its key of type K
. Elements can optionally be mapped to value of type V
, it happens before elements shuffle, so it can have positive performance influence.
Finally, elements with the same key are aggregated by user-defined ReduceFunctor
, ReduceFunction
or CombinableReduceFunction
. They differs in number of arguments they take and in way output is interpreted. ReduceFunction
is basically a function which takes Stream
of elements as input and outputs one aggregation result. ReduceFunctor
takes second Collector
which allows for access to Context
. When CombinableReduceFunction
is provided, partial reduction is performed before shuffle so less data have to be transported through network.
Following example shows basic usage of ReduceByKey
operator including value extraction.
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-counts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(Stream::count)
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
Now suppose that we want to track our ReduceByKey
internals using counter.
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(
(Stream<Integer> s, Collector<Long> collector) -> {
collector.collect(s.count());
collector.asContext().getCounter("num-of-keys").increment();
})
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
Again the same example with optimized combinable output.
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will e used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be enough
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
CombinableReduceFunction
has to be associative and commutative to be truly combinable. So it can be used to compute partial results before shuffle. And then merge partial result to one. That is why simple Stream::count
will not work in this example unlike in the previous one.Euphoria aims to make code easy to write and read. Therefore some support to write combinable reduce functions in form of Fold
or folding function is already there. It allows user to supply only the reduction logic (BinaryFunction
) and creates CombinableReduceFunction
out of it. Supplied BinaryFunction
still have to be associative.
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLenght =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(Fold.of((l1, l2) -> l1 + l2))
.output();
// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
ReduceWindow
Reduces all elements in a window. The operator corresponds to ReduceByKey
with the same key for all elements, so the actual key is defined only by window.
//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
//lets assign time-stamp to each input element
PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();
PCollection<Integer> output =
ReduceWindow.of(withEventTime)
.combineBy(Fold.of((i1, i2) -> i1 + i2))
.windowBy(FixedWindows.of(Duration.millis(5000)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.output();
//output will contain: [ 10, 26 ]
SumByKey
Summing elements with same key. Requires input dataset to be mapped by given key extractor (UnaryFunction
) to keys. By value extractor, also UnaryFunction
which outputs to Long
, to values. Those values are then grouped by key and summed. Output is emitted as KV<K, Long>
(K
is key type) where each KV
contains key and number of element in input dataset for the key.
Union
Merge of at least two datasets of the same type without any guarantee about elements ordering.
//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
//suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals =
Union.named("to-animals")
.of(cats, rodents)
.output();
// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"
TopPerKey
Emits one top-rated element per key. Key of type K
is extracted by given UnaryFunction
. Another UnaryFunction
extractor allows for conversion input elements to values of type V
. Selection of top element is based on score, which is obtained from each element by user supplied UnaryFunction
called score calculator. Score type is denoted as ScoreT
and it is required to extend Comparable<ScoreT>
so scores of two elements can be compared directly. Output dataset elements are of type Triple<K, V, ScoreT>
.
// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
TopPerKey.named("longest-animal-names")
.of(animals)
.keyBy(name -> name.charAt(0)) // first character is the key
.valueBy(UnaryFunction.identity()) // value type is the same as input element type
.scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
.output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
TopPerKey
is a shuffle operator so it allows for widowing to be defined.AssignEventTime
Euphoria needs to know how to extract time-stamp from elements when windowing is applied. AssignEventTime
tells Euphoria how to do that through given implementation of ExtractEventTime
function.
// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
AssignEventTime.named("extract-event-time")
.of(events)
.using(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
Translation
Euphoria API is built on top of Beam Java SDK. The API is transparently translated into Beam’s PTransforms
in background.
The fact that Euphoria API is translated to Beam Java SDK give us option to fine tune the translation itself. Translation of an Operator
is realized through implementations of OperatorTranslator
.
Euphoria uses TranslationProvider
to decide which translator should be used. User of Euphoria API can supply its own OperatorTranslator
through TranslationProvider
by extending EuphoriaOptions
.
Euphoria already contains some useful implementations.
TranslationProviders
GenericTranslatorProvider
General TranslationProvider
. Allows for registration of OperatorTranslator
three different ways:
- Registration of operator specific translator by operator class.
- Registration operator specific translator by operator class and additional user defined predicate.
- Registration of general (not specific to one operator type) translator with user defined predicate.
Order of registration is important since
GenericTranslatorProvider
returns first suitable translator.
GenericTranslatorProvider.newBuilder()
.register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
.register(
Join.class,
(Join op) -> {
String name = ((Optional<String>) op.getName()).orElse("");
return name.toLowerCase().startsWith("broadcast");
},
new BroadcastHashJoinTranslator<>()) // register by class and predicate
.register(
op -> op instanceof CompositeOperator,
new CompositeOperatorTranslator<>()) // register by predicate only
.build();
GenericTranslatorProvider
is default provider, see GenericTranslatorProvider.createWithDefaultTranslators()
.
CompositeProvider
Implements chaining of TranslationProvider
s in given order. That in turn allows for composing user defined TranslationProvider
with already supplied by Euphoria API.
Operator Translators
Each Operator
needs to be translated to Java Beam SDK. That is done by implementations of OperatorTranslator
. Euphoria API contains translator for every Operator
implementation supplied with it.
Some operators may have an alternative translations suitable in some cases. Join
typically may have many implementations. We are describing only the most interesting here.
BroadcastHashJoinTranslator
Is able to translate LeftJoin
and RightJoin
when whole dataset of one side fits in memory of target executor. So it can be distributed using Beam’s side inputs. Resulting in better performance.
CompositeOperatorTranslator
Some operators are composite. Meaning that they are in fact wrapped chain of other operators. CompositeOperatorTranslator
ensures that they are decomposed to elemental operators during translation process.
Details
Most of the translation happens in org.apache.beam.sdk.extensions.euphoria.core.translate
package. Where the most interesting classes are:
OperatorTranslator
- Interface which defining inner API of Euphoria to Beam translation.TranslatorProvider
- Way of supplying custom translators.OperatorTransform
- Is governing actual translation and/or expansion Euphoria’s operators to Beam’sPTransform
EuphoriaOptions
- APipelineOptions
, allows for setting customTranslatorProvider
.
The package also contains implementation of OperatorTranslator
for each supported operator type (JoinTranslator
, FlatMapTranslator
, ReduceByKeyTranslator
). Not every operator needs to have translator of its own. Some of them can be composed from other operators. That is why operators may implement CompositeOperator
which give them option to be expanded to set of other Euphoria operators.
The translation process was designed with flexibility in mind. We wanted to allow different ways of translating higher-level Euphoria operators to Beam’s SDK’s primitives. It allows for further performance optimizations based on user choices or some knowledge about data obtained automatically.
Unsupported Features
Original Euphoria contained some features and operators not jet supported in Beam port. List of not yet supported features follows:
ReduceByKey
in original Euphoria was allowed to sort output values (per key). This is also not yet translatable into Beam, therefore not supported.