#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Nexmark model.
The nexmark suite is a series of queries (streaming pipelines) performed
on a simulation of auction events. The model includes the three roles that
generate events:
- The person who starts and auction or makes a bid (Person).
- The auction item (Auction).
- The bid on an item for auction (Bid).
"""
from apache_beam.coders import coder_impl
from apache_beam.coders.coders import FastCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.testing.benchmarks.nexmark import nexmark_util
[docs]
class PersonCoder(FastCoder):
[docs]
def to_type_hint(self):
return Person
def _create_impl(self):
return PersonCoderImpl()
[docs]
def is_deterministic(self):
return True
[docs]
class Person(object):
"Author of an auction or a bid."
CODER = PersonCoder()
def __init__(
self, id, name, email, credit_card, city, state, date_time, extra=None):
self.id = id
self.name = name
self.email_address = email # key
self.credit_card = credit_card
self.city = city
self.state = state
self.date_time = date_time
self.extra = extra
def __repr__(self):
return nexmark_util.model_to_json(self)
[docs]
class AuctionCoder(FastCoder):
[docs]
def to_type_hint(self):
return Auction
def _create_impl(self):
return AuctionCoderImpl()
[docs]
def is_deterministic(self):
return True
[docs]
class Auction(object):
"Item for auction."
CODER = AuctionCoder()
def __init__(
self,
id,
item_name,
description,
initial_bid,
reserve_price,
date_time,
expires,
seller,
category,
extra=None):
self.id = id
self.item_name = item_name # key
self.description = description
self.initial_bid = initial_bid
self.reserve = reserve_price
self.date_time = date_time
self.expires = expires
self.seller = seller
self.category = category
self.extra = extra
def __repr__(self):
return nexmark_util.model_to_json(self)
[docs]
class BidCoder(FastCoder):
[docs]
def to_type_hint(self):
return Bid
def _create_impl(self):
return BidCoderImpl()
[docs]
def is_deterministic(self):
return True
[docs]
class Bid(object):
"A bid for an item for auction."
CODER = BidCoder()
def __init__(self, auction, bidder, price, date_time, extra=None):
self.auction = auction # key
self.bidder = bidder
self.price = price
self.date_time = date_time
self.extra = extra
def __repr__(self):
return nexmark_util.model_to_json(self)
[docs]
class AuctionCoderImpl(coder_impl.StreamCoderImpl):
_int_coder_impl = coder_impl.VarIntCoderImpl()
_str_coder_impl = StrUtf8Coder().get_impl()
_time_coder_impl = coder_impl.TimestampCoderImpl()
[docs]
def encode_to_stream(self, value, stream, nested):
self._int_coder_impl.encode_to_stream(value.id, stream, True)
self._str_coder_impl.encode_to_stream(value.item_name, stream, True)
self._str_coder_impl.encode_to_stream(value.description, stream, True)
self._int_coder_impl.encode_to_stream(value.initial_bid, stream, True)
self._int_coder_impl.encode_to_stream(value.reserve, stream, True)
self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
self._time_coder_impl.encode_to_stream(value.expires, stream, True)
self._int_coder_impl.encode_to_stream(value.seller, stream, True)
self._int_coder_impl.encode_to_stream(value.category, stream, True)
self._str_coder_impl.encode_to_stream(value.extra, stream, True)
[docs]
def decode_from_stream(self, stream, nested):
id = self._int_coder_impl.decode_from_stream(stream, True)
item_name = self._str_coder_impl.decode_from_stream(stream, True)
description = self._str_coder_impl.decode_from_stream(stream, True)
initial_bid = self._int_coder_impl.decode_from_stream(stream, True)
reserve = self._int_coder_impl.decode_from_stream(stream, True)
date_time = self._time_coder_impl.decode_from_stream(stream, True)
expires = self._time_coder_impl.decode_from_stream(stream, True)
seller = self._int_coder_impl.decode_from_stream(stream, True)
category = self._int_coder_impl.decode_from_stream(stream, True)
extra = self._str_coder_impl.decode_from_stream(stream, True)
return Auction(
id,
item_name,
description,
initial_bid,
reserve,
date_time,
expires,
seller,
category,
extra)
[docs]
class BidCoderImpl(coder_impl.StreamCoderImpl):
_int_coder_impl = coder_impl.VarIntCoderImpl()
_str_coder_impl = StrUtf8Coder().get_impl()
_time_coder_impl = coder_impl.TimestampCoderImpl()
[docs]
def encode_to_stream(self, value, stream, nested):
self._int_coder_impl.encode_to_stream(value.auction, stream, True)
self._int_coder_impl.encode_to_stream(value.bidder, stream, True)
self._int_coder_impl.encode_to_stream(value.price, stream, True)
self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
self._str_coder_impl.encode_to_stream(value.extra, stream, True)
[docs]
def decode_from_stream(self, stream, nested):
auction = self._int_coder_impl.decode_from_stream(stream, True)
bidder = self._int_coder_impl.decode_from_stream(stream, True)
price = self._int_coder_impl.decode_from_stream(stream, True)
date_time = self._time_coder_impl.decode_from_stream(stream, True)
extra = self._str_coder_impl.decode_from_stream(stream, True)
return Bid(auction, bidder, price, date_time, extra)
[docs]
class PersonCoderImpl(coder_impl.StreamCoderImpl):
_int_coder_impl = coder_impl.VarIntCoderImpl()
_str_coder_impl = StrUtf8Coder().get_impl()
_time_coder_impl = coder_impl.TimestampCoderImpl()
[docs]
def encode_to_stream(self, value, stream, nested):
self._int_coder_impl.encode_to_stream(value.id, stream, True)
self._str_coder_impl.encode_to_stream(value.name, stream, True)
self._str_coder_impl.encode_to_stream(value.email_address, stream, True)
self._str_coder_impl.encode_to_stream(value.credit_card, stream, True)
self._str_coder_impl.encode_to_stream(value.city, stream, True)
self._str_coder_impl.encode_to_stream(value.state, stream, True)
self._time_coder_impl.encode_to_stream(value.date_time, stream, True)
self._str_coder_impl.encode_to_stream(value.extra, stream, True)
[docs]
def decode_from_stream(self, stream, nested):
id = self._int_coder_impl.decode_from_stream(stream, True)
name = self._str_coder_impl.decode_from_stream(stream, True)
email = self._str_coder_impl.decode_from_stream(stream, True)
credit_card = self._str_coder_impl.decode_from_stream(stream, True)
city = self._str_coder_impl.decode_from_stream(stream, True)
state = self._str_coder_impl.decode_from_stream(stream, True)
date_time = self._time_coder_impl.decode_from_stream(stream, True)
extra = self._str_coder_impl.decode_from_stream(stream, True)
return Person(id, name, email, credit_card, city, state, date_time, extra)