Nexmark benchmark suite

What it is

Nexmark is a suite of pipelines inspired by the ‘continuous data stream’ queries in Nexmark research paper

These are multiple queries over a three entities model representing on online auction system:

The queries

The queries exercise many aspects of Beam model:

We have augmented the original queries with five more:

Benchmark workload configuration

Here are some of the knobs of the benchmark workload (see NexmarkConfiguration.java).

These configuration items can be passed to the launch command line.

Events generation (defaults)

Windows (defaults)

Events Proportions (defaults)

Technical

Nexmark output

Here is an example output of the Nexmark benchmark run in streaming mode with the SMOKE suite on the (local) direct runner:

Performance:
  Conf       Runtime(sec)         Events(/sec)         Results
  0000                5,5              18138,9          100000
  0001                4,2              23657,4           92000
  0002                2,2              45683,0             351
  0003                3,9              25348,5             444
  0004                1,6               6207,3              40
  0005                5,0              20173,5              12
  0006                0,9              11376,6             401
  0007              121,4                823,5               1
  0008                2,5              40273,9            6000
  0009                0,9              10695,2             298
  0010                4,0              25025,0               1
  0011                4,4              22655,2            1919
  0012                3,5              28208,7            1919

Benchmark launch configuration

The Nexmark launcher accepts the --runner argument as usual for programs that use Beam PipelineOptions to manage their command line arguments. In addition to this, the necessary dependencies must be configured.

When running via Gradle, the following two parameters control the execution:

-P nexmark.args
    The command line to pass to the Nexmark main program.

-P nexmark.runner
The Gradle project name of the runner, such as ":runners:direct-java" or
":runners:flink:1.13. The project names can be found in the root
    `settings.gradle.kts`.

Test data is deterministically synthesized on demand. The test data may be synthesized in the same pipeline as the query itself, or may be published to Pub/Sub or Kafka.

The query results may be:

Common configuration parameters

Decide if batch or streaming:

--streaming=true

Number of events generators:

--numEventGenerators=4

Queries can be run by their name or by their number (number is still there for backward compatibility, only the queries 0 to 12 have a number)

Run query N:

--query=N

Run query called PASSTHROUGH:

--query=PASSTHROUGH

Available Suites

The suite to run can be chosen using this configuration parameter:

--suite=SUITE

Available suites are:

Google Cloud Dataflow runner specific configuration

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=gs://<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://<a gs path for temporary files> \
--filesToStage=target/beam-sdks-java-nexmark-2.61.0.jar

Direct runner specific configuration

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--manageResources=false --monitorJobs=true \
--flinkMaster=[local] --parallelism=#numcores

Spark runner specific configuration

--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true

Kafka source/sink configuration parameters

Set Kafka host/ip (for example, “localhost:9092”):

--bootstrapServers=<kafka host/ip>

Write results into Kafka topic:

--sinkType=KAFKA

Set topic name which will be used for benchmark results:

--kafkaResultsTopic=<topic name>

Write or/and read events into/from Kafka topic:

--sourceType=KAFKA

Set topic name which will be used for benchmark events:

--kafkaTopic=<topic name>

Current status

These tables contain statuses of the queries runs in the different runners. Google Cloud Dataflow status is yet to come.

Batch / Synthetic / Local

QueryDirectSparkFlink
0okokok
1okokok
2okokok
3okokok
4okokok
5okokok
6okokok
7okokok
8okokok
9okokok
10okokok
11okokok
12okokok
BOUNDED_SIDE_INPUT_JOINokokok

Streaming / Synthetic / Local

QueryDirectSpark Issue 18416Flink
0okokok
1okokok
2okokok
3okIssue 18074, BEAM-3961ok
4okokok
5okokok
6okokok
7okBEAM-2112ok
8okokok
9okokok
10okokok
11okokok
12okokok
BOUNDED_SIDE_INPUT_JOINokBEAM-2112ok

Batch / Synthetic / Cluster

Yet to come

Streaming / Synthetic / Cluster

Yet to come

Running Nexmark

Running SMOKE suite on the DirectRunner (local)

The DirectRunner is default, so it is not required to pass -Pnexmark.runner. Here we do it for maximum clarity.

The direct runner does not have separate batch and streaming modes, but the Nexmark launch does.

These parameters leave on many of the DirectRunner’s extra safety checks so the SMOKE suite can make sure there is nothing broken in the Nexmark suite.

Batch Mode:

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=false
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

Streaming Mode:

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=true
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

Running SMOKE suite on the SparkRunner (local)

The SparkRunner is special-cased in the Nexmark gradle launch. The task will provide the version of Spark that the SparkRunner is built against, and configure logging.

Batch Mode:

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true"

Streaming Mode:

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true"

Running SMOKE suite on the FlinkRunner (local)

Batch Mode:

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

Streaming Mode:

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

Running SMOKE suite on Google Cloud Dataflow

Set these up first so the below command is valid

PROJECT=<your project>
ZONE=<your zone>
STAGING_LOCATION=gs://<a GCS path for staging>
PUBSUB_TOPCI=<existing pubsub topic>

Launch:

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:google-cloud-dataflow-java" \
    -Pnexmark.args="
        --runner=DataflowRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --project=${PROJECT}
        --zone=${ZONE}
        --workerMachineType=n1-highmem-8
        --stagingLocation=${STAGING_LOCATION}
        --sourceType=PUBSUB
        --pubSubMode=PUBLISH_ONLY
        --pubsubTopic=${PUBSUB_TOPIC}
        --resourceNameMode=VERBATIM
        --manageResources=false
        --numEventGenerators=64
        --numWorkers=16
        --maxNumWorkers=16
        --firstEventRate=100000
        --nextEventRate=100000
        --ratePeriodSec=3600
        --isRateLimited=true
        --avgPersonByteSize=500
        --avgAuctionByteSize=500
        --avgBidByteSize=500
        --probDelayedEvent=0.000001
        --occasionalDelaySec=3600
        --numEvents=0
        --useWallclockEventTime=true
        --usePubsubPublishTime=true
        --experiments=enable_custom_pubsub_sink"

Running query 0 on a Spark cluster with Apache Hadoop YARN

Building package:

./gradlew :sdks:java:testing:nexmark:assemble

Submit to the cluster:

spark-submit \
    --class org.apache.beam.sdk.nexmark.Main \
    --master yarn-client \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
    sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-2.61.0-spark.jar \
        --runner=SparkRunner \
        --query=0 \
        --streamTimeout=60 \
        --streaming=false \
        --manageResources=false \
        --monitorJobs=true"

Nexmark dashboards

Below dashboards are used as a CI mechanism to detect no-regression on the Beam components. They are not supposed to be benchmark comparison of the runners or engines. Especially because:

Dashboards content

At each commit on master, Nexmark suites are run and plots are created on the graphs. All metrics dashboards are hosted at metrics.beam.apache.org.

There are 2 kinds of dashboards:

There are dashboards for these runners (others to come):

Each dashboard contains: