#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour
(updated every minute). In CQL syntax::
SELECT Rstream(auction)
FROM (SELECT B1.auction, count(*) AS num
FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1
GROUP BY B1.auction)
WHERE num >= ALL (SELECT count(*)
FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2
GROUP BY B2.auction);
To make things a bit more dynamic and easier to test we use much shorter
windows, and we'll also preserve the bid counts.
"""
import apache_beam as beam
from apache_beam.testing.benchmarks.nexmark.queries import nexmark_query_util
from apache_beam.testing.benchmarks.nexmark.queries.nexmark_query_util import ResultNames
from apache_beam.transforms import window
[docs]
def load(events, metadata=None, pipeline_options=None):
return (
events
| nexmark_query_util.JustBids()
| 'query5_sliding_window' >> beam.WindowInto(
window.SlidingWindows(
metadata.get('window_size_sec'),
metadata.get('window_period_sec')))
# project out only the auction id for each bid
| 'extract_bid_auction' >> beam.Map(lambda bid: bid.auction)
| 'bid_count_per_auction' >> beam.combiners.Count.PerElement()
| 'bid_max_count' >> beam.CombineGlobally(
MostBidCombineFn()).without_defaults()
# TODO(leiyiz): fanout with sliding window produces duplicated results,
# uncomment after it is fixed
# [https://github.com/apache/beam/issues/20528]
# .with_fanout(metadata.get('fanout'))
| beam.FlatMap(
lambda auc_count: [{
ResultNames.AUCTION_ID: auction, ResultNames.NUM: auc_count[1]
} for auction in auc_count[0]]))
[docs]
class MostBidCombineFn(beam.CombineFn):
"""
combiner function to find auctions with most bid counts
"""
[docs]
def create_accumulator(self):
return [], 0
[docs]
def merge_accumulators(self, accumulators):
max_list = []
max_count = 0
for (accu_list, count) in accumulators:
if count == max_count:
max_list = max_list + accu_list
elif count < max_count:
continue
else:
max_list = accu_list
max_count = count
return max_list, max_count