Apache Beam
  • apache_beam.coders package
  • apache_beam.dataframe package
  • apache_beam.io package
  • apache_beam.metrics package
  • apache_beam.ml package
    • Subpackages
      • apache_beam.ml.anomaly package
      • apache_beam.ml.gcp package
      • apache_beam.ml.inference package
      • apache_beam.ml.rag package
        • Subpackages
        • Submodules
      • apache_beam.ml.transforms package
  • apache_beam.options package
  • apache_beam.portability package
  • apache_beam.runners package
  • apache_beam.testing package
  • apache_beam.transforms package
  • apache_beam.typehints package
  • apache_beam.utils package
  • apache_beam.yaml package
  • apache_beam.error module
  • apache_beam.pipeline module
  • apache_beam.pvalue module
Apache Beam
  • apache_beam.ml package
  • apache_beam.ml.rag package
  • apache_beam.ml.rag.enrichment package
  • apache_beam.ml.rag.enrichment.bigquery_vector_search module
  • View page source

apache_beam.ml.rag.enrichment.bigquery_vector_search module

class apache_beam.ml.rag.enrichment.bigquery_vector_search.BigQueryVectorSearchParameters(project: str, table_name: str, embedding_column: str, columns: List[str], neighbor_count: int, metadata_restriction_template: str | None = None, distance_type: str | None = None, options: Dict[str, Any] | None = None)[source]

Bases: object

Parameters for configuring BigQuery vector similarity search.

This class is used by BigQueryVectorSearchEnrichmentHandler to perform vector similarity search using BigQuery’s VECTOR_SEARCH function. It processes Chunk objects that contain Embedding and returns similar vectors from a BigQuery table.

BigQueryVectorSearchEnrichmentHandler is used with Enrichment transform to enrich Chunks with similar content from a vector database. For example:

>>> # Create search parameters
>>> params = BigQueryVectorSearchParameters(
...     table_name='project.dataset.embeddings',
...     embedding_column='embedding',
...     columns=['content'],
...     neighbor_count=5
... )
>>> # Use in pipeline
>>> enriched = (
...     chunks
...     | "Generate Embeddings" >> MLTransform(...)
...     | "Find Similar" >> Enrichment(
...         BigQueryVectorSearchEnrichmentHandler(
...             project='my-project',
...             vector_search_parameters=params
...         )
...     )
... )

BigQueryVectorSearchParameters encapsulates the configuration needed to perform vector similarity search using BigQuery’s VECTOR_SEARCH function. It handles formatting the query with proper embedding vectors and metadata restrictions.

Example with flattened metadata column:

Table schema:

embedding: ARRAY<FLOAT64>  # Vector embedding
content: STRING           # Document content
language: STRING          # Direct metadata column

Code:

>>> params = BigQueryVectorSearchParameters(
...     table_name='project.dataset.embeddings',
...     embedding_column='embedding',
...     columns=['content', 'language'],
...     neighbor_count=5,
...     # For column 'language', value comes from
...     # chunk.metadata['language']
...     metadata_restriction_template="language = '{language}'"
... )
>>> # When processing a chunk with metadata={'language': 'en'},
>>> # generates: WHERE language = 'en'

Example with nested repeated metadata:

Table schema:

embedding: ARRAY<FLOAT64>  # Vector embedding
content: STRING           # Document content
metadata: ARRAY<STRUCT>   # Nested repeated metadata
  key: STRING,
  value: STRING
>>

Code:

>>> params = BigQueryVectorSearchParameters(
...     table_name='project.dataset.embeddings',
...     embedding_column='embedding',
...     columns=['content', 'metadata'],
...     neighbor_count=5,
...     # check_metadata(field_name, key_to_search, value_from_chunk)
...     metadata_restriction_template=(
...         "check_metadata(metadata, 'language', '{language}')"
...     )
... )
>>> # When processing a chunk with metadata={'language': 'en'},
>>> # generates: WHERE check_metadata(metadata, 'language', 'en')
>>> # Searches for {key: 'language', value: 'en'} in metadata array
Parameters:
  • project – GCP project ID containing the BigQuery dataset

  • table_name – Fully qualified BigQuery table name containing vectors.

  • embedding_column – Column name containing the embedding vectors.

  • columns – List of columns to retrieve from matched vectors.

  • neighbor_count – Number of similar vectors to return (top-k).

  • metadata_restriction_template –

    Template string for filtering vectors. Two formats supported:

    1. For flattened metadata columns: column_name = '{metadata_key}' where column_name is the BigQuery column and metadata_key is used to get the value from chunk.metadata[metadata_key].

    2. For nested repeated metadata (ARRAY<STRUCT<key,value>>): check_metadata(field_name, 'key_to_match', '{metadata_key}') where field_name is the ARRAY<STRUCT> column in BigQuery, key_to_match is the literal key to search for in the array, and metadata_key is used to get value from chunk.metadata[metadata_key].

    Multiple conditions can be combined using AND/OR operators. For example:

    >>> # Combine metadata check with column filter
    >>> template = (
    ...     "check_metadata(metadata, 'language', '{language}') "
    ...     "AND source = '{source}'"
    ... )
    >>> # When chunk.metadata = {'language': 'en', 'source': 'web'}
    >>> # Generates: WHERE
    >>> #             check_metadata(metadata, 'language', 'en')
    >>> #           AND source = 'web'
    

  • distance_type – Optional distance metric to use. Supported values: COSINE (default), EUCLIDEAN, or DOT_PRODUCT.

  • options – Optional dictionary of additional VECTOR_SEARCH options.

project: str
table_name: str
embedding_column: str
columns: List[str]
neighbor_count: int
metadata_restriction_template: str | None = None
distance_type: str | None = None
options: Dict[str, Any] | None = None
format_query(chunks: List[Chunk]) → str[source]

Format the vector search query template.

class apache_beam.ml.rag.enrichment.bigquery_vector_search.BigQueryVectorSearchEnrichmentHandler(vector_search_parameters: BigQueryVectorSearchParameters, *, min_batch_size: int = 1, max_batch_size: int = 1000, **kwargs)[source]

Bases: EnrichmentSourceHandler[Chunk | List[Chunk], List[Tuple[Chunk, Dict[str, Any]]]]

Enrichment handler that performs vector similarity search using BigQuery.

This handler enriches Chunks by finding similar vectors in a BigQuery table using the VECTOR_SEARCH function. It supports batching requests for efficiency and preserves the original Chunk metadata while adding the search results.

Example

>>> from apache_beam.ml.rag.types import Chunk, Content, Embedding
>>>
>>> # Configure vector search
>>> params = BigQueryVectorSearchParameters(
...     table_name='project.dataset.embeddings',
...     embedding_column='embedding',
...     columns=['content', 'metadata'],
...     neighbor_count=2,
...     metadata_restriction_template="language = '{language}'"
... )
>>>
>>> # Create handler
>>> handler = BigQueryVectorSearchEnrichmentHandler(
...     project='my-project',
...     vector_search_parameters=params,
...     min_batch_size=100,
...     max_batch_size=1000
... )
>>>
>>> # Use in pipeline
>>> with beam.Pipeline() as p:
...     enriched = (
...         p
...         | beam.Create([
...             Chunk(
...                 id='query1',
...                 embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
...                 content=Content(text='test query'),
...                 metadata={'language': 'en'}
...             )
...         ])
...         | Enrichment(handler)
...     )
Parameters:
  • vector_search_parameters – Configuration for the vector search query

  • min_batch_size – Minimum number of chunks to batch before processing

  • max_batch_size – Maximum number of chunks to process in one batch

  • **kwargs – Additional arguments passed to bigquery.Client

The handler will: 1. Batch incoming chunks according to batch size parameters 2. Format and execute vector search query for each batch 3. Join results back to original chunks 4. Return tuples of (original_chunk, search_results)

batch_elements_kwargs() → Dict[str, int][source]

Returns kwargs for beam.BatchElements.

apache_beam.ml.rag.enrichment.bigquery_vector_search.join_fn(left: Embedding, right: Dict[str, Any]) → Embedding[source]
Previous Next

© Copyright %Y, Apache Beam.

Built with Sphinx using a theme provided by Read the Docs.