apache_beam.utils.shared module

Shared class.

Shared is a helper class for managing a single instance of an object shared by multiple threads within the same process. Instances of Shared are serializable objects that can be shared by all threads of each worker process. A Shared object encapsulates a weak reference to a singleton instance of the shared resource. The singleton is lazily initialized by calls to Shared.acquire().

Example usage:

To share a very large list across all threads of each worker in a DoFn:

# Several built-in types such as list and dict do not directly support weak
# references but can add support through subclassing:
# https://docs.python.org/3/library/weakref.html
class WeakRefList(list):
  pass

class GetNthStringFn(beam.DoFn):
  def __init__(self):
    self._shared_handle = shared.Shared()

  def setup(self):
    # setup is a good place to initialize transient in-memory resources.
    def initialize_list():
      # Build the giant initial list.
      return WeakRefList([str(i) for i in range(1000000)])

    self._giant_list = self._shared_handle.acquire(initialize_list)

  def process(self, element):
    yield self._giant_list[element]

p = beam.Pipeline()
(p | beam.Create([2, 4, 6, 8])
   | beam.ParDo(GetNthStringFn()))

Real-world uses will typically involve using a side-input to a DoFn to initialize the shared resource in a way that can’t be done with just its constructor:

class RainbowTableLookupFn(beam.DoFn):
  def __init__(self):
    self._shared_handle = shared.Shared()

  def process(self, element, table_elements):
    def construct_table():
      # Construct the rainbow table from the table elements.
      # The table contains lines in the form "string::hash"
      result = {}
      for key, value in table_elements:
        result[value] = key
      return result

    rainbow_table = self._shared_handle.acquire(construct_table)
    unhashed_str = rainbow_table.get(element)
    if unhashed_str is not None:
      yield unhashed_str

p = beam.Pipeline()
reverse_hash_table = p | "ReverseHashTable" >> beam.Create([
                ('a', '0cc175b9c0f1b6a831c399e269772661'),
                ('b', '92eb5ffee6ae2fec3ad71c777531578f'),
                ('c', '4a8a08f09d37b73795649038408b5f33'),
                ('d', '8277e0910d750195b448797616e091ad')])
unhashed = (p
            | 'Hashes' >> beam.Create([
                '0cc175b9c0f1b6a831c399e269772661',
                '8277e0910d750195b448797616e091ad'])
            | 'Unhash' >> beam.ParDo(
                 RainbowTableLookupFn(), reverse_hash_table))
class apache_beam.utils.shared.Shared[source]

Bases: object

Handle for managing shared per-process objects.

Each instance of a Shared object represents a distinct handle to a distinct object. Example usage is described in the file comment of shared.py.

This object has the following limitations: * A shared object won’t be GC’ed if there isn’t another acquire called for a different shared object. * Each stage can only use exactly one Shared token, otherwise only one Shared token, NOT NECESSARILY THE LATEST, will be “kept-alive”. * If there are two different stages using separate Shared tokens, but which get fused together, only one Shared token will be “kept-alive”.

(See documentation of _SharedMap for details.)

acquire(constructor_fn: Callable[[], Any], tag: Any | None = None) Any[source]

Acquire a reference to the object associated with this Shared handle.

Parameters:
  • constructor_fn – function that initialises / constructs the object if not present in the cache. This function should take no arguments. It should return an initialised object, or None if the object could not be initialised / constructed.

  • tag – an optional indentifier to store with the cached object. If subsequent calls to acquire use different tags, the object will be reloaded rather than returned from cache.

Returns:

A reference to an initialised object, either from the cache, or newly-constructed.