apache_beam.io.gcp.bigquery_file_loads module

Functionality to perform file loads into BigQuery for Batch and Streaming pipelines.

This source is able to work around BigQuery load quotas and limitations. When destinations are dynamic, or when data for a single job is too large, the data will be split into multiple jobs.

NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.

apache_beam.io.gcp.bigquery_file_loads.file_prefix_generator(with_validation=True, pipeline_gcs_location=None, temp_location=None)[source]
class apache_beam.io.gcp.bigquery_file_loads.WriteRecordsToFile(schema, max_files_per_bundle=20, max_file_size=4398046511104, file_format=None)[source]

Bases: DoFn

Write input records to files before triggering a load job.

This transform keeps up to max_files_per_bundle files open to write to. It receives (destination, record) tuples, and it writes the records to different files for each destination.

If there are more than max_files_per_bundle destinations that we need to write to, then those records are grouped by their destination, and later written to files by WriteGroupedRecordsToFile.

It outputs two PCollections.

Initialize a WriteRecordsToFile.

Parameters:
  • max_files_per_bundle (int) – The maximum number of files that can be kept open during execution of this step in a worker. This is to avoid over- whelming the worker memory.

  • max_file_size (int) – The maximum size in bytes for a file to be used in an export job.

UNWRITTEN_RECORD_TAG = 'UnwrittenRecords'
WRITTEN_FILE_TAG = 'WrittenFiles'
display_data()[source]
start_bundle()[source]
process(element, file_prefix, *schema_side_inputs)[source]

Take a tuple with (destination, row) and write to file or spill out.

Destination may be a TableReference or a string, and row is a Python dictionary for a row to be inserted to BigQuery.

finish_bundle()[source]
class apache_beam.io.gcp.bigquery_file_loads.WriteGroupedRecordsToFile(schema, max_file_size=4398046511104, file_format=None)[source]

Bases: DoFn

Receives collection of dest-iterable(records), writes it to files.

This is different from WriteRecordsToFile because it receives records grouped by destination. This means that it’s not necessary to keep multiple file descriptors open, because we know for sure when records for a single destination have been written out.

Experimental; no backwards compatibility guarantees.

process(element, file_prefix, *schema_side_inputs)[source]
class apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema(project=None, write_disposition=None, test_client=None, additional_bq_parameters=None, step_name=None, load_job_project_id=None)[source]

Bases: DoFn

Update destination schema based on data that is about to be copied into it.

Unlike load and query jobs, BigQuery copy jobs do not support schema field addition or relaxation on the destination table. This DoFn fills that gap by updating the destination table schemas to be compatible with the data coming from the source table so that schema field modification options are respected regardless of whether data is loaded directly to the destination table or loaded into temporary tables before being copied into the destination.

This transform takes as input a (destination, job_reference) pair where the job_reference refers to a completed load job into a temporary table.

This transform emits (destination, job_reference) pairs where the job_reference refers to a submitted load job for performing the schema modification in JSON format. Note that the input and output job references are not the same.

Experimental; no backwards compatibility guarantees.

start_bundle()[source]
display_data()[source]
process(element, schema_mod_job_name_prefix)[source]
finish_bundle()[source]
class apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs(project=None, create_disposition=None, write_disposition=None, test_client=None, step_name=None, load_job_project_id=None)[source]

Bases: DoFn

Launches jobs to copy from temporary tables into the main target table.

When a job needs to write to multiple destination tables, or when a single destination table needs to have multiple load jobs to write to it, files are loaded into temporary tables, and those tables are later copied to the destination tables.

This transform emits (destination, job_reference) pairs.

TODO(BEAM-7822): In file loads method of writing to BigQuery,

copying from temp_tables to destination_table is not atomic. See: https://issues.apache.org/jira/browse/BEAM-7822

TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
display_data()[source]
setup()[source]
start_bundle()[source]
process(element_list, job_name_prefix=None, unused_schema_mod_jobs=None)[source]
process_one(element, job_name_prefix)[source]
finish_bundle()[source]
class apache_beam.io.gcp.bigquery_file_loads.TriggerLoadJobs(schema=None, project=None, create_disposition=None, write_disposition=None, test_client=None, temporary_tables=False, additional_bq_parameters=None, source_format=None, step_name=None, load_job_project_id=None)[source]

Bases: DoFn

Triggers the import jobs to BQ.

Experimental; no backwards compatibility guarantees.

TEMP_TABLES = 'TemporaryTables'
ONGOING_JOBS = 'OngoingJobs'
display_data()[source]
start_bundle()[source]
process(element, load_job_name_prefix, pane_info=PaneInfoParam, *schema_side_inputs)[source]
finish_bundle()[source]
class apache_beam.io.gcp.bigquery_file_loads.PartitionFiles(max_partition_size, max_files_per_partition)[source]

Bases: DoFn

MULTIPLE_PARTITIONS_TAG = 'MULTIPLE_PARTITIONS'
SINGLE_PARTITION_TAG = 'SINGLE_PARTITION'
class Partition(max_size, max_files, files=None, size=0)[source]

Bases: object

can_accept(file_size, no_of_files=1)[source]
add(file_path, file_size)[source]
process(element)[source]
class apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn(test_client=None)[source]

Bases: DoFn

start_bundle()[source]
process(table_reference)[source]
class apache_beam.io.gcp.bigquery_file_loads.BigQueryBatchFileLoads(destination, project=None, schema=None, custom_gcs_temp_location=None, create_disposition=None, write_disposition=None, triggering_frequency=None, with_auto_sharding=False, temp_file_format=None, max_file_size=None, max_files_per_bundle=None, max_partition_size=None, max_files_per_partition=None, additional_bq_parameters=None, table_side_inputs=None, schema_side_inputs=None, test_client=None, validate=True, is_streaming_pipeline=False, load_job_project_id=None)[source]

Bases: PTransform

Takes in a set of elements, and inserts them to BigQuery via batch loads.

DESTINATION_JOBID_PAIRS = 'destination_load_jobid_pairs'
DESTINATION_FILE_PAIRS = 'destination_file_pairs'
DESTINATION_COPY_JOBID_PAIRS = 'destination_copy_jobid_pairs'
COUNT = 0
verify()[source]
expand(pcoll)[source]