Source code for

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from abc import ABC
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Union

import numpy
import pandas
import scipy

import datatable
import xgboost
from import FileSystems
from import ExampleT
from import ModelHandler
from import ModelT
from import PredictionResult
from import PredictionT

__all__ = [

XGBoostInferenceFn = Callable[[
    Union[xgboost.Booster, xgboost.XGBModel],
    Optional[Dict[str, Any]]

def default_xgboost_inference_fn(
    batch: Sequence[object],
    model: Union[xgboost.Booster, xgboost.XGBModel],
    inference_args: Optional[Dict[str,
                                  Any]] = None) -> Iterable[PredictionResult]:
  inference_args = {} if not inference_args else inference_args

  if type(model) == xgboost.Booster:
    batch = [xgboost.DMatrix(array) for array in batch]
  predictions = [model.predict(el, **inference_args) for el in batch]

  return [PredictionResult(x, y) for x, y in zip(batch, predictions)]

[docs] class XGBoostModelHandler(ModelHandler[ExampleT, PredictionT, ModelT], ABC): def __init__( self, model_class: Union[Callable[..., xgboost.Booster], Callable[..., xgboost.XGBModel]], model_state: str, inference_fn: XGBoostInferenceFn = default_xgboost_inference_fn, *, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, **kwargs): """Implementation of the ModelHandler interface for XGBoost. Example Usage:: pcoll | RunInference( XGBoostModelHandler( model_class="XGBoost Model Class", model_state="my_model_state.json"))) See for details Args: model_class: class of the XGBoost model that defines the model structure. model_state: path to a json file that contains the model's configuration. inference_fn: the inference function to use during RunInference. default=default_xgboost_inference_fn min_batch_size: optional. the minimum batch size to use when batching inputs. max_batch_size: optional. the maximum batch size to use when batching inputs. max_batch_duration_secs: optional. the maximum amount of time to buffer a batch before emitting; used in streaming contexts. kwargs: 'env_vars' can be used to set environment variables before loading the model. **Supported Versions:** RunInference APIs in Apache Beam have been tested with XGBoost 1.6.0 and 1.7.0 XGBoost 1.0.0 introduced support for using JSON to save and load XGBoost models. XGBoost 1.6.0, additional support for Universal Binary JSON. It is recommended to use a model trained in XGBoost 1.6.0 or higher. While you should be able to load models created in older versions, there are no guarantees this will work as expected. This class is the superclass of all the various XGBoostModelhandlers and should not be instantiated directly. (See instead XGBoostModelHandlerNumpy, XGBoostModelHandlerPandas, etc.) """ self._model_class = model_class self._model_state = model_state self._inference_fn = inference_fn self._env_vars = kwargs.get('env_vars', {}) self._batching_kwargs = {} if min_batch_size is not None: self._batching_kwargs["min_batch_size"] = min_batch_size if max_batch_size is not None: self._batching_kwargs["max_batch_size"] = max_batch_size if max_batch_duration_secs is not None: self._batching_kwargs["max_batch_duration_secs"] = max_batch_duration_secs
[docs] def load_model(self) -> Union[xgboost.Booster, xgboost.XGBModel]: model = self._model_class() model_state_file_handler =, 'rb') model_state_bytes = # Convert into a bytearray so that the # model state can be loaded in XGBoost model_state_bytearray = bytearray(model_state_bytes) model.load_model(model_state_bytearray) return model
[docs] def get_metrics_namespace(self) -> str: return 'BeamML_XGBoost'
[docs] def batch_elements_kwargs(self) -> Mapping[str, Any]: return self._batching_kwargs
[docs] class XGBoostModelHandlerNumpy(XGBoostModelHandler[numpy.ndarray, PredictionResult, Union[xgboost.Booster, xgboost.XGBModel]]): """Implementation of the ModelHandler interface for XGBoost using numpy arrays as input. Example Usage:: pcoll | RunInference( XGBoostModelHandlerNumpy( model_class="XGBoost Model Class", model_state="my_model_state.json"))) Args: model_class: class of the XGBoost model that defines the model structure. model_state: path to a json file that contains the model's configuration. inference_fn: the inference function to use during RunInference. default=default_xgboost_inference_fn """
[docs] def run_inference( self, batch: Sequence[numpy.ndarray], model: Union[xgboost.Booster, xgboost.XGBModel], inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """Runs inferences on a batch of 2d numpy arrays. Args: batch: A sequence of examples as 2d numpy arrays. Each row in an array is a single example. The dimensions must match the dimensions of the data used to train the model. model: XGBoost booster or XBGModel (sklearn interface). Must implement predict(X). Where the parameter X is a 2d numpy array. inference_args: Any additional arguments for an inference. Returns: An Iterable of type PredictionResult. """ return self._inference_fn(batch, model, inference_args)
[docs] def get_num_bytes(self, batch: Sequence[numpy.ndarray]) -> int: """ Returns: The number of bytes of data for a batch. """ return sum(sys.getsizeof(element) for element in batch)
[docs] class XGBoostModelHandlerPandas(XGBoostModelHandler[pandas.DataFrame, PredictionResult, Union[xgboost.Booster, xgboost.XGBModel]]): """Implementation of the ModelHandler interface for XGBoost using pandas dataframes as input. Example Usage:: pcoll | RunInference( XGBoostModelHandlerPandas( model_class="XGBoost Model Class", model_state="my_model_state.json"))) Args: model_class: class of the XGBoost model that defines the model structure. model_state: path to a json file that contains the model's configuration. inference_fn: the inference function to use during RunInference. default=default_xgboost_inference_fn """
[docs] def run_inference( self, batch: Sequence[pandas.DataFrame], model: Union[xgboost.Booster, xgboost.XGBModel], inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """Runs inferences on a batch of pandas dataframes. Args: batch: A sequence of examples as pandas dataframes. Each row in a dataframe is a single example. The dimensions must match the dimensions of the data used to train the model. model: XGBoost booster or XBGModel (sklearn interface). Must implement predict(X). Where the parameter X is a pandas dataframe. inference_args: Any additional arguments for an inference. Returns: An Iterable of type PredictionResult. """ return self._inference_fn(batch, model, inference_args)
[docs] def get_num_bytes(self, batch: Sequence[pandas.DataFrame]) -> int: """ Returns: The number of bytes of data for a batch of Numpy arrays. """ return sum(df.memory_usage(deep=True).sum() for df in batch)
[docs] class XGBoostModelHandlerSciPy(XGBoostModelHandler[scipy.sparse.csr_matrix, PredictionResult, Union[xgboost.Booster, xgboost.XGBModel]]): """ Implementation of the ModelHandler interface for XGBoost using scipy matrices as input. Example Usage:: pcoll | RunInference( XGBoostModelHandlerSciPy( model_class="XGBoost Model Class", model_state="my_model_state.json"))) Args: model_class: class of the XGBoost model that defines the model structure. model_state: path to a json file that contains the model's configuration. inference_fn: the inference function to use during RunInference. default=default_xgboost_inference_fn """
[docs] def run_inference( self, batch: Sequence[scipy.sparse.csr_matrix], model: Union[xgboost.Booster, xgboost.XGBModel], inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """Runs inferences on a batch of SciPy sparse matrices. Args: batch: A sequence of examples as Scipy sparse matrices. The dimensions must match the dimensions of the data used to train the model. model: XGBoost booster or XBGModel (sklearn interface). Must implement predict(X). Where the parameter X is a SciPy sparse matrix. inference_args: Any additional arguments for an inference. Returns: An Iterable of type PredictionResult. """ return self._inference_fn(batch, model, inference_args)
[docs] def get_num_bytes(self, batch: Sequence[scipy.sparse.csr_matrix]) -> int: """ Returns: The number of bytes of data for a batch. """ return sum(sys.getsizeof(element) for element in batch)
[docs] class XGBoostModelHandlerDatatable(XGBoostModelHandler[datatable.Frame, PredictionResult, Union[xgboost.Booster, xgboost.XGBModel]] ): """Implementation of the ModelHandler interface for XGBoost using datatable dataframes as input. Example Usage:: pcoll | RunInference( XGBoostModelHandlerDatatable( model_class="XGBoost Model Class", model_state="my_model_state.json"))) Args: model_class: class of the XGBoost model that defines the model structure. model_state: path to a json file that contains the model's configuration. inference_fn: the inference function to use during RunInference. default=default_xgboost_inference_fn """
[docs] def run_inference( self, batch: Sequence[datatable.Frame], model: Union[xgboost.Booster, xgboost.XGBModel], inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """Runs inferences on a batch of datatable dataframe. Args: batch: A sequence of examples as datatable dataframes. Each row in a dataframe is a single example. The dimensions must match the dimensions of the data used to train the model. model: XGBoost booster or XBGModel (sklearn interface). Must implement predict(X). Where the parameter X is a datatable dataframe. inference_args: Any additional arguments for an inference. Returns: An Iterable of type PredictionResult. """ return self._inference_fn(batch, model, inference_args)
[docs] def get_num_bytes(self, batch: Sequence[datatable.Frame]) -> int: """ Returns: The number of bytes of data for a batch. """ return sum(sys.getsizeof(element) for element in batch)