apache_beam.testing.benchmarks.nexmark.nexmark_util module

Utilities for the Nexmark suite.

The Nexmark suite is a series of queries (streaming pipelines) performed on a simulation of auction events. This util includes:

  • A Command class used to terminate the streaming jobs launched in nexmark_launcher.py by the DirectRunner.

  • A ParseEventFn DoFn to parse events received from PubSub.

Usage:

To run a process for a certain duration, define in the code:

command = Command(process_to_terminate, args) command.run(timeout=duration)

class apache_beam.testing.benchmarks.nexmark.nexmark_util.Command(cmd, args)[source]

Bases: object

run(timeout)[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.setup_coder()[source]
class apache_beam.testing.benchmarks.nexmark.nexmark_util.ParseEventFn(*unused_args, **unused_kwargs)[source]

Bases: DoFn

Original parser for parsing raw events info into a Python objects.

Each event line has the following format:

person: <id starting with ‘p’>,name,email,credit_card,city, state,timestamp,extra auction: <id starting with ‘a’>,item_name, description,initial_bid, reserve_price,timestamp,expires,seller,category,extra bid: <auction starting with ‘b’>,bidder,price,timestamp,extra

For example:

‘p12345,maria,maria@maria.com,1234-5678-9012-3456, sunnyvale,CA,1528098831536’ ‘a12345,car67,2012 hyundai elantra,15000,20000, 1528098831536,20180630,maria,vehicle’ ‘b12345,maria,20000,1528098831536’

process(elem)[source]
class apache_beam.testing.benchmarks.nexmark.nexmark_util.ParseJsonEventFn(*unused_args, **unused_kwargs)[source]

Bases: DoFn

Parses the raw event info into a Python objects.

Each event line has the following format:

person: {id,name,email,credit_card,city, state,timestamp,extra} auction: {id,item_name, description,initial_bid, reserve_price,timestamp,expires,seller,category,extra} bid: {auction,bidder,price,timestamp,extra}

For example:

{“id”:1000,”name”:”Peter Jones”,”emailAddress”:”nhd@xcat.com”, “creditCard”:”7241 7320 9143 4888”,”city”:”Portland”,”state”:”WY”, “dateTime”:1528098831026,”extra”:”WN_HS_bnpVQ[[“}

{“id”:1000,”itemName”:”wkx mgee”,”description”:”eszpqxtdxrvwmmywkmogoahf”, “initialBid”:28873,”reserve”:29448,”dateTime”:1528098831036, “expires”:1528098840451,”seller”:1000,”category”:13,”extra”:”zcuupiz”}

{“auction”:1000,”bidder”:1001,”price”:32530001,”dateTime”:1528098831066, “extra”:”fdiysaV^]NLVsbolvyqwgticfdrwdyiyofWPYTOuwogvszlxjrcNOORM”}

process(elem)[source]
class apache_beam.testing.benchmarks.nexmark.nexmark_util.CountAndLog(label: str | None = None)[source]

Bases: PTransform

expand(pcoll)[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.log_count_info(count)[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.display(elm)[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.model_to_json(model)[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.construct_json_dict(model)[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.unnest_to_json(cand)[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.millis_to_timestamp(millis: int) Timestamp[source]
apache_beam.testing.benchmarks.nexmark.nexmark_util.get_counter_metric(result: PipelineResult, namespace: str, name: str) int[source]

get specific counter metric from pipeline result

Parameters:
  • result – the PipelineResult which metrics are read from

  • namespace – a string representing the namespace of wanted metric

  • name – a string representing the name of the wanted metric

Returns:

the result of the wanted metric if it exist, else -1

apache_beam.testing.benchmarks.nexmark.nexmark_util.get_start_time_metric(result: PipelineResult, namespace: str, name: str) int[source]

get the start time out of all times recorded by the specified distribution metric

Parameters:
  • result – the PipelineResult which metrics are read from

  • namespace – a string representing the namespace of wanted metric

  • name – a string representing the name of the wanted metric

Returns:

the smallest time in the metric or -1 if it doesn’t exist

apache_beam.testing.benchmarks.nexmark.nexmark_util.get_end_time_metric(result: PipelineResult, namespace: str, name: str) int[source]

get the end time out of all times recorded by the specified distribution metric

Parameters:
  • result – the PipelineResult which metrics are read from

  • namespace – a string representing the namespace of wanted metric

  • name – a string representing the name of the wanted metric

Returns:

the largest time in the metric or -1 if it doesn’t exist