apache_beam.ml.inference.pytorch_inference module

class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerTensor(state_dict_path: str | None = None, model_class: ~typing.Callable[[...], torch.nn.Module] | None = None, model_params: ~typing.Dict[str, ~typing.Any] | None = None, device: str = 'CPU', *, inference_fn: ~typing.Callable[[~typing.Sequence[torch.Tensor], torch.nn.Module, torch.device, ~typing.Dict[str, ~typing.Any] | None, str | None], ~typing.Iterable[~apache_beam.ml.inference.base.PredictionResult]] = <function default_tensor_inference_fn>, torch_script_model_path: str | None = None, min_batch_size: int | None = None, max_batch_size: int | None = None, max_batch_duration_secs: int | None = None, large_model: bool = False, model_copies: int | None = None, load_model_args: ~typing.Dict[str, ~typing.Any] | None = None, **kwargs)[source]

Bases: ModelHandler[Tensor, PredictionResult, Module]

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerTensor(state_dict_path=”my_uri”,

model_class=”my_class”))

Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerTensor(

torch_script_model_path=”my_uri”))

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.

  • model_class – class of the Pytorch model that defines the model structure.

  • model_params – A dictionary of arguments required to instantiate the model class.

  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.

  • inference_fn – the inference function to use during RunInference. default=_default_tensor_inference_fn

  • torch_script_model_path

    Path to the torch script model.

    the model will be loaded using torch.jit.load().

    state_dict_path, model_class and model_params

    arguments will be disregarded.

  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.

  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Tensors.

  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.

  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.

  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.

  • load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.

  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested with PyTorch 1.9 and 1.10.

load_model() torch.nn.Module[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: str | None = None)[source]
run_inference(batch: Sequence[torch.Tensor], model: torch.nn.Module, inference_args: Dict[str, Any] | None = None) Iterable[PredictionResult][source]

Runs inferences on a batch of Tensors and returns an Iterable of Tensor Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.

  • model – A PyTorch model.

  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched

Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[torch.Tensor]) int[source]
Returns:

The number of bytes of data for a batch of Tensors.

get_metrics_namespace() str[source]
Returns:

A namespace for metrics collected by the RunInference transform.

validate_inference_args(inference_args: Dict[str, Any] | None)[source]
batch_elements_kwargs()[source]
share_model_across_processes() bool[source]
model_copies() int[source]
class apache_beam.ml.inference.pytorch_inference.PytorchModelHandlerKeyedTensor(state_dict_path: str | None = None, model_class: ~typing.Callable[[...], torch.nn.Module] | None = None, model_params: ~typing.Dict[str, ~typing.Any] | None = None, device: str = 'CPU', *, inference_fn: ~typing.Callable[[~typing.Sequence[~typing.Dict[str, torch.Tensor]], torch.nn.Module, torch.device, ~typing.Dict[str, ~typing.Any] | None, str | None], ~typing.Iterable[~apache_beam.ml.inference.base.PredictionResult]] = <function default_keyed_tensor_inference_fn>, torch_script_model_path: str | None = None, min_batch_size: int | None = None, max_batch_size: int | None = None, max_batch_duration_secs: int | None = None, large_model: bool = False, model_copies: int | None = None, load_model_args: ~typing.Dict[str, ~typing.Any] | None = None, **kwargs)[source]

Bases: ModelHandler[Dict[str, Tensor], PredictionResult, Module]

Implementation of the ModelHandler interface for PyTorch.

Example Usage for torch model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(

state_dict_path=”my_uri”, model_class=”my_class”))

Example Usage for torchscript model::
pcoll | RunInference(PytorchModelHandlerKeyedTensor(

torch_script_model_path=”my_uri”))

NOTE: This API and its implementation are under development and do not provide backward compatibility guarantees.

See https://pytorch.org/tutorials/beginner/saving_loading_models.html for details

Parameters:
  • state_dict_path – path to the saved dictionary of the model state.

  • model_class – class of the Pytorch model that defines the model structure.

  • model_params – A dictionary of arguments required to instantiate the model class.

  • device – the device on which you wish to run the model. If device = GPU then a GPU device will be used if it is available. Otherwise, it will be CPU.

  • inference_fn – the function to invoke on run_inference. default = default_keyed_tensor_inference_fn

  • torch_script_model_path

    Path to the torch script model.

    the model will be loaded using torch.jit.load().

    state_dict_path, model_class and model_params

    arguments will be disregarded.

  • min_batch_size – the minimum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.

  • max_batch_size – the maximum batch size to use when batching inputs. This batch will be fed into the inference_fn as a Sequence of Keyed Tensors.

  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.

  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.

  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.

  • load_model_args – a dictionary of parameters passed to the torch.load function to specify custom config for loading models.

  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: RunInference APIs in Apache Beam have been tested on torch>=1.9.0,<1.14.0.

load_model() torch.nn.Module[source]

Loads and initializes a Pytorch model for processing.

update_model_path(model_path: str | None = None)[source]
run_inference(batch: Sequence[Dict[str, torch.Tensor]], model: torch.nn.Module, inference_args: Dict[str, Any] | None = None) Iterable[PredictionResult][source]

Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensor Predictions.

For the same key across all examples, this will stack all Tensors values in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of keyed Tensors. These Tensors should be batchable, as this method will call torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s forward() function.

  • model – A PyTorch model.

  • inference_args – Non-batchable arguments required as inputs to the model’s forward() function. Unlike Tensors in batch, these parameters will not be dynamically batched

Returns:

An Iterable of type PredictionResult.

get_num_bytes(batch: Sequence[torch.Tensor]) int[source]
Returns:

The number of bytes of data for a batch of Dict of Tensors.

get_metrics_namespace() str[source]
Returns:

A namespace for metrics collected by the RunInference transform.

validate_inference_args(inference_args: Dict[str, Any] | None)[source]
batch_elements_kwargs()[source]
share_model_across_processes() bool[source]
model_copies() int[source]