Google BigQuery patterns

The samples on this page show you common patterns for use with BigQueryIO.

BigQueryIO deadletter pattern

In production systems, it is useful to implement the deadletter pattern with BigQueryIO outputting any elements which had errors during processing by BigQueryIO into another PCollection for further processing. The samples below print the errors, but in a production system they can be sent to a deadletter table for later correction.

When using STREAMING_INSERTS you can use the WriteResult object to access a PCollection with the TableRows that failed to be inserted into BigQuery. If you also set the withExtendedErrorInfo property , you will be able to access a PCollection<BigQueryInsertError> from the WriteResult. The PCollection will then include a reference to the table, the data row and the InsertErrors. Which errors are added to the deadletter queue is determined via the InsertRetryPolicy.

In the result tuple you can access FailedRows to access the failed inserts.

      PipelineOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);

      Pipeline p = Pipeline.create(options);

      // Create a bug by writing the 2nd value as null. The API will correctly
      // throw an error when trying to insert a null value into a REQUIRED field.
      WriteResult result =
          p.apply(Create.of(1, 2))
              .apply(
                  BigQueryIO.<Integer>write()
                      .withSchema(
                          new TableSchema()
                              .setFields(
                                  ImmutableList.of(
                                      new TableFieldSchema()
                                          .setName("num")
                                          .setType("INTEGER")
                                          .setMode("REQUIRED"))))
                      .to("Test.dummyTable")
                      .withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                      // Forcing the bounded pipeline to use streaming inserts
                      .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                      // set the withExtendedErrorInfo property.
                      .withExtendedErrorInfo()
                      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

      result
          .getFailedInsertsWithErr()
          .apply(
              MapElements.into(TypeDescriptors.strings())
                  .via(
                      x -> {
                        System.out.println(" The table was " + x.getTable());
                        System.out.println(" The row was " + x.getRow());
                        System.out.println(" The error was " + x.getError());
                        return "";
                      }));
      p.run();

      /*  Sample Output From the pipeline:
       <p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
       <p>The row was GenericData{classInfo=[f], {num=null}}
       <p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
      */
    }
  # Create pipeline.
  schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})

  pipeline = beam.Pipeline()

  errors = (
      pipeline | 'Data' >> beam.Create([1, 2])
      | 'CreateBrokenData' >>
      beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
          "<Your Project:Test.dummy_a_table",
          schema=schema,
          insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
          create_disposition='CREATE_IF_NEEDED',
          write_disposition='WRITE_APPEND'))
  result = (
      errors['FailedRows']
      | 'PrintErrors' >>
      beam.FlatMap(lambda err: print("Error Found {}".format(err))))