apache_beam.ml.inference.huggingface_inference module

class apache_beam.ml.inference.huggingface_inference.HuggingFaceModelHandlerTensor(model_uri: str, model_class: transformers.AutoModel | transformers.TFAutoModel, device: str = 'CPU', *, inference_fn: Callable[[...], Iterable[PredictionResult]] | None = None, load_model_args: Dict[str, Any] | None = None, min_batch_size: int | None = None, max_batch_size: int | None = None, max_batch_duration_secs: int | None = None, large_model: bool = False, model_copies: int | None = None, **kwargs)[source]

Bases: ModelHandler[Tensor | Tensor, PredictionResult, AutoModel | TFAutoModel]

Implementation of the ModelHandler interface for HuggingFace with Tensors for PyTorch/Tensorflow backend.

Depending on the type of tensors, the model framework is determined automatically.

Example Usage model:
pcoll | RunInference(HuggingFaceModelHandlerTensor(

model_uri=”bert-base-uncased”, model_class=AutoModelForMaskedLM))

Parameters:
  • model_uri (str) – path to the pretrained model on the hugging face models hub.

  • model_class – model class to load the repository from model_uri.

  • device – For torch tensors, specify device on which you wish to run the model. Defaults to CPU.

  • inference_fn – the inference function to use during RunInference. Default is _run_inference_torch_keyed_tensor or _run_inference_tensorflow_keyed_tensor depending on the input type.

  • load_model_args (Dict[str, Any]) – (Optional) keyword arguments to provide load options while loading models from Hugging Face Hub. Defaults to None.

  • min_batch_size – the minimum batch size to use when batching inputs.

  • max_batch_size – the maximum batch size to use when batching inputs.

  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.

  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.

  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.

  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: HuggingFaceModelHandler supports transformers>=4.18.0.

load_model()[source]

Loads and initializes the model for processing.

run_inference(batch: Sequence[tensorflow.Tensor | torch.Tensor], model: transformers.AutoModel | transformers.TFAutoModel, inference_args: Dict[str, Any] | None = None) Iterable[PredictionResult][source]

Runs inferences on a batch of Tensors and returns an Iterable of Tensors Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Tensors. These Tensors should be batchable, as this method will call tf.stack()/torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s predict() function.

  • model – A Tensorflow/PyTorch model.

  • inference_args (Dict[str, Any]) – Non-batchable arguments required as inputs to the model’s inference function. Unlike Tensors in batch, these parameters will not be dynamically batched.

Returns:

An Iterable of type PredictionResult.

update_model_path(model_path: str | None = None)[source]
get_num_bytes(batch: Sequence[tensorflow.Tensor | torch.Tensor]) int[source]
Returns:

The number of bytes of data for the Tensors batch.

batch_elements_kwargs()[source]
share_model_across_processes() bool[source]
model_copies() int[source]
get_metrics_namespace() str[source]
Returns:

A namespace for metrics collected by the RunInference transform.

class apache_beam.ml.inference.huggingface_inference.HuggingFaceModelHandlerKeyedTensor(model_uri: str, model_class: transformers.AutoModel | transformers.TFAutoModel, framework: str, device: str = 'CPU', *, inference_fn: Callable[[...], Iterable[PredictionResult]] | None = None, load_model_args: Dict[str, Any] | None = None, min_batch_size: int | None = None, max_batch_size: int | None = None, max_batch_duration_secs: int | None = None, large_model: bool = False, model_copies: int | None = None, **kwargs)[source]

Bases: ModelHandler[Dict[str, Tensor | Tensor], PredictionResult, AutoModel | TFAutoModel]

Implementation of the ModelHandler interface for HuggingFace with Keyed Tensors for PyTorch/Tensorflow backend.

Example Usage model::
pcoll | RunInference(HuggingFaceModelHandlerKeyedTensor(

model_uri=”bert-base-uncased”, model_class=AutoModelForMaskedLM, framework=’pt’))

Parameters:
  • model_uri (str) – path to the pretrained model on the hugging face models hub.

  • model_class – model class to load the repository from model_uri.

  • framework (str) – Framework to use for the model. ‘tf’ for TensorFlow and ‘pt’ for PyTorch.

  • device – For torch tensors, specify device on which you wish to run the model. Defaults to CPU.

  • inference_fn – the inference function to use during RunInference. Default is _run_inference_torch_keyed_tensor or _run_inference_tensorflow_keyed_tensor depending on the input type.

  • load_model_args (Dict[str, Any]) – (Optional) Keyword arguments to provide load options while loading models from Hugging Face Hub. Defaults to None.

  • min_batch_size – the minimum batch size to use when batching inputs.

  • max_batch_size – the maximum batch size to use when batching inputs.

  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.

  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.

  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.

  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: HuggingFaceModelHandler supports transformers>=4.18.0.

load_model()[source]

Loads and initializes the model for processing.

run_inference(batch: Sequence[Dict[str, tensorflow.Tensor | torch.Tensor]], model: transformers.AutoModel | transformers.TFAutoModel, inference_args: Dict[str, Any] | None = None) Iterable[PredictionResult][source]

Runs inferences on a batch of Keyed Tensors and returns an Iterable of Tensors Predictions.

This method stacks the list of Tensors in a vectorized format to optimize the inference call.

Parameters:
  • batch – A sequence of Keyed Tensors. These Tensors should be batchable, as this method will call tf.stack()/torch.stack() and pass in batched Tensors with dimensions (batch_size, n_features, etc.) into the model’s predict() function.

  • model – A Tensorflow/PyTorch model.

  • inference_args – Non-batchable arguments required as inputs to the model’s inference function. Unlike Tensors in batch, these parameters will not be dynamically batched.

Returns:

An Iterable of type PredictionResult.

update_model_path(model_path: str | None = None)[source]
get_num_bytes(batch: Sequence[tensorflow.Tensor | torch.Tensor]) int[source]
Returns:

The number of bytes of data for the Tensors batch.

batch_elements_kwargs()[source]
share_model_across_processes() bool[source]
model_copies() int[source]
get_metrics_namespace() str[source]
Returns:

A namespace for metrics collected by the RunInference transform.

class apache_beam.ml.inference.huggingface_inference.HuggingFacePipelineModelHandler(task: str | ~apache_beam.ml.inference.huggingface_inference.PipelineTask = '', model: str = '', *, device: str | None = None, inference_fn: ~typing.Callable[[~typing.Sequence[str], transformers.Pipeline, ~typing.Dict[str, ~typing.Any] | None], ~typing.Iterable[~apache_beam.ml.inference.base.PredictionResult]] = <function _default_pipeline_inference_fn>, load_pipeline_args: ~typing.Dict[str, ~typing.Any] | None = None, min_batch_size: int | None = None, max_batch_size: int | None = None, max_batch_duration_secs: int | None = None, large_model: bool = False, model_copies: int | None = None, **kwargs)[source]

Bases: ModelHandler[str, PredictionResult, Pipeline]

Implementation of the ModelHandler interface for Hugging Face Pipelines.

Example Usage model::
pcoll | RunInference(HuggingFacePipelineModelHandler(

task=”fill-mask”))

Parameters:
  • task (str or enum.Enum) – task supported by HuggingFace Pipelines. Accepts a string task or an enum.Enum from PipelineTask.

  • model (str) –

    path to the pretrained model-id on Hugging Face Models Hub to use custom model for the chosen task. If the model already defines the task then no need to specify the task parameter. Use the model-id string instead of an actual model here. Model-specific kwargs for from_pretrained(…, **model_kwargs) can be specified with model_kwargs using load_pipeline_args.

    Example Usage::
    model_handler = HuggingFacePipelineModelHandler(

    task=”text-generation”, model=”meta-llama/Llama-2-7b-hf”, load_pipeline_args={‘model_kwargs’:{‘quantization_map’:config}})

  • device (str) – the device (“CPU” or “GPU”) on which you wish to run the pipeline. Defaults to GPU. If GPU is not available then it falls back to CPU. You can also use advanced option like device_map with key-value pair as you would do in the usual Hugging Face pipeline using load_pipeline_args. Ex: load_pipeline_args={‘device_map’:auto}).

  • inference_fn – the inference function to use during RunInference. Default is _default_pipeline_inference_fn.

  • load_pipeline_args (Dict[str, Any]) – keyword arguments to provide load options while loading pipelines from Hugging Face. Defaults to None.

  • min_batch_size – the minimum batch size to use when batching inputs.

  • max_batch_size – the maximum batch size to use when batching inputs.

  • max_batch_duration_secs – the maximum amount of time to buffer a batch before emitting; used in streaming contexts.

  • large_model – set to true if your model is large enough to run into memory pressure if you load multiple copies. Given a model that consumes N memory and a machine with W cores and M memory, you should set this to True if N*W > M.

  • model_copies – The exact number of models that you would like loaded onto your machine. This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization.

  • kwargs – ‘env_vars’ can be used to set environment variables before loading the model.

Supported Versions: HuggingFacePipelineModelHandler supports transformers>=4.18.0.

load_model()[source]

Loads and initializes the pipeline for processing.

run_inference(batch: Sequence[str], pipeline: transformers.Pipeline, inference_args: Dict[str, Any] | None = None) Iterable[PredictionResult][source]

Runs inferences on a batch of examples passed as a string resource. These can either be string sentences, or string path to images or audio files.

Parameters:
  • batch – A sequence of strings resources.

  • pipeline – A Hugging Face Pipeline.

  • inference_args – Non-batchable arguments required as inputs to the model’s inference function.

Returns:

An Iterable of type PredictionResult.

update_model_path(model_path: str | None = None)[source]

Updates the pretrained model used by the Hugging Face Pipeline task. Make sure that the new model does the same task as initial model.

Parameters:

model_path (str) – (Optional) Path to the new trained model from Hugging Face. Defaults to None.

get_num_bytes(batch: Sequence[str]) int[source]
Returns:

The number of bytes of input batch elements.

batch_elements_kwargs()[source]
share_model_across_processes() bool[source]
model_copies() int[source]
get_metrics_namespace() str[source]
Returns:

A namespace for metrics collected by the RunInference transform.