Built-in I/O Transforms

Web APIs I/O connector

The Beam SDKs include a built-in transform, called RequestResponseIO to support reads and writes with Web APIs such as REST or gRPC.

Discussion below focuses on the Java SDK. Python examples will be added in the future; see tracker issue: #30422. Additionally, support for the Go SDK is not yet available; see tracker issue: #30423.

RequestResponseIO Features

Features this transform provides include:

This guide currently focuses on the first two bullet points above, the minimal code requirements and error handling. In the future, it may be expanded to show examples of additional features. Links to additional resources is provided below.

Additional resources

Before you start

To use RequestResponseIO, add the dependency to your Gradle build.gradle(.kts) or Maven pom.xml file. See Maven Central for available versions.

Below shows an example adding the Beam BOM and related dependencies such as Beam core to your build.gradle(.kts) file.

// Apache Beam BOM
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom
implementation("org.apache.beam:beam-sdks-java-bom:2.60.0")

// Beam Core SDK
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-core
implementation("org.apache.beam:beam-sdks-java-core")

// RequestResponseIO dependency
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio
implementation("org.apache.beam:beam-sdks-java-io-rrio")

Or using Maven, add the artifact dependency to your pom.xml file.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-rrio</artifactId>
    <version>2.60.0</version>
</dependency>

RequestResponseIO basics

Minimal code

The minimal code needed to read from or write to Web APIs is:

  1. Caller implementation.
  2. Instantiate RequestResponseIO.

Implementing the Caller

Caller requires only one method override: call, whose purpose is to interact with the API, converting a request into a response. The transform’s DoFn invokes this method within its DoFn.ProcessElement method. The transform handles everything else including repeating failed requests and exponential backoff (discussed more below).

// MyCaller invokes a Web API with MyRequest and returns the resulting MyResponse.
class MyCaller<MyRequest, MyResponse> implements Caller<MyRequest, MyResponse> {

    @Override
    public MyResponse call(MyRequest request) throws UserCodeExecutionException {

        // Do something with request and return the response.

    }

}

Instantiate RequestResponseIO

Using RequestResponseIO is as simple as shown below. As mentioned, it minimally requires two parameters: the Caller and the expected Coder of the response. (Note: If the concept of a Beam Coder is new to you, please see the Apache Beam Programming Guide on this subject. This guide also has an example below.)

The RequestResponseIO transform returns a Result that bundles any failures and the PCollection of successful responses. In Beam, we call this the additional outputs pattern, which typically requires a bit of boilerplate but the transform takes care of it for you. Using the transform, you get the success and failure PCollections via Result::getFailures and Result::getResponses.

Below shows an abbreviated snippet how the transform may work in your pipeline.

// Step 1. Define the Coder for the response.
Coder<MyResponse> responseCoder = ...

// Step 2. Build the request PCollection.
PCollection<MyRequest> requests = ...

// Step 3. Instantiate the RequestResponseIO with the Caller and Coder and apply it to the request PCollection.
Result<MyResponse> result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder));

// Step 4a. Do something with the responses.
result.getResponses().apply( ... );

// Step 4b. Apply failures to a dead letter sink.
result.getFailures().apply( ... );

RequestResponseIO takes care of everything else needed to invoke the Caller for each request. It doesn’t care what you do inside your Caller, whether you make raw HTTP calls or use client code. Later this guide discusses the advantage of this design for testing.

API call repeats and failures

As mentioned above, RequestResponseIO returns a Result that bundles both the success and failure PCollections resulting from your Caller. This section provides a little more detail about handling failures and specifics on API call repeats with backoff.

Handling failures

The failures are an ApiIOError PCollection that you may apply to a logging transform or a transform that saves the errors to a downstream sink for later analysis and troubleshooting.

Since ApiIOError is already mapped to a Beam Schema, it has compatibility with most of Beam’s existing I/O connectors. (Note: If the concept of Beam Schemas is new to you, please see the Beam Programming Guide.) For example, you can easily send ApiIOError records to BigQuery for analysis and troubleshooting as shown below without converting the records first to a TableRow.

static void writeFailuresToBigQuery(
    PCollection<ApiIOError> failures,
    TableReference tableReference,
    BigQueryIO.Write.CreateDisposition createDisposition,
    BigQueryIO.Write.WriteDisposition writeDisposition) {

  // PCollection<ApiIOError> failures = ...
  // TableReference tableReference = ...
  // BigQueryIO.Write.CreateDisposition createDisposition = ...
  // BigQueryIO.Write.WriteDisposition writeDisposition = ...

  failures.apply(
      "Dead letter",
      BigQueryIO.<ApiIOError>write()
          .useBeamSchema()
          .to(tableReference)
          .withCreateDisposition(createDisposition)
          .withWriteDisposition(writeDisposition));
}

API call repeats and backoff

Prior to emitting to the failure PCollection, the transform performs a retry for certain errors after a prescribed exponential backoff. Your Caller must throw specific errors, to signal the transform to perform the retry with backoff. Throwing a UserCodeExecutionException will immediately emit the error into the ApiIOError PCollection.

RequestResponseIO will attempt a retry with backoff when Caller throws:

After a threshold number of retries, the error is emitted into the failure PCollection.

Testing

Since RequestResponseIO doesn’t care what you do inside your Caller implementation, this makes some testing more convenient. Instead of relying on direct calls to a real API within some tests, consequently depending on your external resource, you simply implement a version of your Caller returning responses or throwing exceptions, according to your test logic. For example, if you want to test a downstream step in your pipeline for a specific response, say empty records, you could easily do so via the following. For more information on testing your Beam Pipelines, see the Beam Programming Guide.

@Test
void givenEmptyResponse_thenExpectSomething() {
    // Test expects PTransform underTest should do something as a result of empty records, for example.
    PTransform<Iterable<String>, ?> underTest = ...

    PCollection<String> requests = pipeline.apply(Create.of("aRequest"));
    IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
    Result<Iterable<String>> result = requests.apply(RequestResponseIO.of(new MockEmptyIterableResponse()), coder);

    PAssert.that(result.getResponses().apply(underTest)).containsInAnyOrder(...)

    pipeline.run();
}

// MockEmptyIterableResponse simulates when there are no results from the API.
class MockEmptyIterableResponse<String, Iterable<String>> implements Caller<String, Iterable<String>> {
@Override
    public Iterable<String> call(String request) throws UserCodeExecutionException {
        return Collections.emptyList();
    }
}

Practical examples

Below shows two examples that we will bring together in an end-to-end Beam pipeline. The goal of this pipeline is to download images and use Gemini on Vertex AI to recognize the image content.

Note that this example does not replace our current AI/ML solutions. Please see Get started with AI/ML pipelines for more details on using Beam with AI/ML.

Working with HTTP calls directly

We first need to download images. To do so, we need to make HTTP calls to the image URL and emit their content into a PCollection for use with the Gemini API. The value of this example on its own is that it demonstrates how to use RequestResponseIO to make raw HTTP requests.

Define Caller

We implement the Caller, the HttpImageClient, that receives an ImageRequest and returns an ImageResponse.

For demo purposes, the example uses a KV to preserve the raw URL in the returned ImageResponse containing KV.

Abbreviated snippet

Below shows an abbreviated version of the HttpImageClient showing the important parts.

class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {

    private static final HttpRequestFactory REQUEST_FACTORY =
        new NetHttpTransport().createRequestFactory();

    @Override
    public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV) throws UserCodeExecutionException {

        ImageRequest request = requestKV.getValue();
        GenericUrl url = new GenericUrl(request.getImageUrl());
        HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
        HttpResponse response = imageRequest.execute();

        return KV.of(
            requestKV.getKey(),
            ImageResponse
                .builder()
                // Build ImageResponse from HttpResponse
                .build()
        );
    }

}
Full example

The full implementation is shown below illustrating throwing various exceptions based on the HTTP response code.

/**
 * Implements {@link Caller} to process an {@link ImageRequest} into an {@link ImageResponse} by
 * invoking the HTTP request.
 */
class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {

  private static final int STATUS_TOO_MANY_REQUESTS = 429;
  private static final int STATUS_TIMEOUT = 408;
  private static final HttpRequestFactory REQUEST_FACTORY =
      new NetHttpTransport().createRequestFactory();

  static HttpImageClient of() {
    return new HttpImageClient();
  }

  /**
   * Invokes an HTTP Get request from the {@param request}, returning an {@link ImageResponse}
   * containing the image data.
   */
  @Override
  public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV)
      throws UserCodeExecutionException {

    String key = requestKV.getKey();
    ImageRequest request = requestKV.getValue();
    Preconditions.checkArgument(request != null);
    GenericUrl url = new GenericUrl(request.getImageUrl());

    try {
      HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
      HttpResponse response = imageRequest.execute();

      if (response.getStatusCode() >= 500) {
        // Tells transform to repeat the request.
        throw new UserCodeRemoteSystemException(response.getStatusMessage());
      }

      if (response.getStatusCode() >= 400) {

        switch (response.getStatusCode()) {
          case STATUS_TOO_MANY_REQUESTS:
            // Tells transform to repeat the request.
            throw new UserCodeQuotaException(response.getStatusMessage());

          case STATUS_TIMEOUT:
            // Tells transform to repeat the request.
            throw new UserCodeTimeoutException(response.getStatusMessage());

          default:
            // Tells the tranform to emit immediately into failure PCollection.
            throw new UserCodeExecutionException(response.getStatusMessage());
        }
      }

      InputStream is = response.getContent();
      byte[] bytes = ByteStreams.toByteArray(is);

      return KV.of(
          key,
          ImageResponse.builder()
              .setMimeType(request.getMimeType())
              .setData(ByteString.copyFrom(bytes))
              .build());

    } catch (IOException e) {

      // Tells the tranform to emit immediately into failure PCollection.
      throw new UserCodeExecutionException(e);
    }
  }
}

Define request

ImageRequest is the custom request we provide the HttpImageClient, defined in the example above, to invoke the HTTP call that acquires the image. This example happens to use Google AutoValue, but you can use any custom Serializable Java class as you would in any Beam PCollection, including inherent Java classes such as String, Double, etc. For convenience, this example uses @DefaultSchema(AutoValueSchema.class) allowing us to map our custom type to a Beam Schema automatically based on its getters.

/** An HTTP request for an image. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageRequest implements Serializable {

  static final TypeDescriptor<ImageRequest> TYPE = TypeDescriptor.of(ImageRequest.class);
  private static final Map<String, String> EXT_MIMETYPE_MAP =
      ImmutableMap.of(
          "jpg", "image/jpeg",
          "jpeg", "image/jpeg",
          "png", "image/png");

  /** Derive the MIME type of the image from the url based on its extension. */
  private static String mimeTypeOf(String url) {
    String ext = FileNameUtils.getExtension(url);
    if (!EXT_MIMETYPE_MAP.containsKey(ext)) {
      throw new IllegalArgumentException(
          String.format("could not map extension to mimetype: ext %s of url: %s", ext, url));
    }
    return EXT_MIMETYPE_MAP.get(ext);
  }

  static Builder builder() {
    return new AutoValue_ImageRequest.Builder();
  }

  /** Build an {@link ImageRequest} from a {@param url}. */
  static ImageRequest of(String url) {
    return builder().setImageUrl(url).setMimeType(mimeTypeOf(url)).build();
  }

  /** The URL of the image request. */
  abstract String getImageUrl();

  /** The MIME type of the image request. */
  abstract String getMimeType();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setImageUrl(String value);

    abstract Builder setMimeType(String value);

    abstract ImageRequest build();
  }
}

Define response

ImageResponse is the custom response we return from the HttpImageClient, defined in the example above, that contains the image data as a result of calling the remote server with the image URL. Again, this example happens to use Google AutoValue, but you can use any custom Serializable Java class as you would in any Beam PCollection including inherent Java classes such as String, Double, etc.

/** An HTTP response of an image request. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageResponse implements Serializable {

  static Builder builder() {
    return new AutoValue_ImageResponse.Builder();
  }

  /** The MIME type of the response payload. */
  abstract String getMimeType();

  /** The payload of the response containing the image data. */
  abstract ByteString getData();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setMimeType(String value);

    abstract Builder setData(ByteString value);

    abstract ImageResponse build();
  }
}

Define response coder

RequestResponseIO needs the response’s Coder as its second required parameter, shown in the example below. Please see the Beam Programming Guide for more information about Beam Coders.

/** A {@link CustomCoder} of an {@link ImageResponse}. */
class ImageResponseCoder extends CustomCoder<ImageResponse> {
  public static ImageResponseCoder of() {
    return new ImageResponseCoder();
  }

  private static final Coder<byte[]> BYTE_ARRAY_CODER = ByteArrayCoder.of();
  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();

  @Override
  public void encode(ImageResponse value, OutputStream outStream)
      throws CoderException, IOException {
    BYTE_ARRAY_CODER.encode(value.getData().toByteArray(), outStream);
    STRING_CODER.encode(value.getMimeType(), outStream);
  }

  @Override
  public ImageResponse decode(InputStream inStream) throws CoderException, IOException {
    byte[] data = BYTE_ARRAY_CODER.decode(inStream);
    String mimeType = STRING_CODER.decode(inStream);
    return ImageResponse.builder().setData(ByteString.copyFrom(data)).setMimeType(mimeType).build();
  }
}

Acquire image data from URLs

Below shows an example how to bring everything together in an end-to-end pipeline. From a list of image URLs, the example builds the PCollection of ImageRequests that is applied to an instantiated RequestResponseIO, using the HttpImageClient Caller implementation.

Any failures, accessible from the Result’s getFailures getter, are outputted to logs. As already discussed above, one could write these failures to a database or filesystem.

  /** Example demonstrating downloading a list of image URLs using {@link RequestResponseIO}. */
  static void readFromGetEndpointExample(List<String> urls, Pipeline pipeline) {
    //        Pipeline pipeline = Pipeline.create();
    //        List<String> urls = ImmutableList.of(
    //                "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
    //        );

    // Step 1: Convert the list of URLs to a PCollection of ImageRequests.
    PCollection<KV<String, ImageRequest>> requests = Images.requestsOf(urls, pipeline);

    // Step 2: RequestResponseIO requires a Coder as its second parameter.
    KvCoder<String, ImageResponse> responseCoder =
        KvCoder.of(StringUtf8Coder.of(), ImageResponseCoder.of());

    // Step 3: Process ImageRequests using RequestResponseIO instantiated from the Caller
    // implementation and the expected PCollection response Coder.
    Result<KV<String, ImageResponse>> result =
        requests.apply(
            ImageResponse.class.getSimpleName(),
            RequestResponseIO.of(HttpImageClient.of(), responseCoder));

    // Step 4: Log any failures to stderr.
    result.getFailures().apply("logErrors", Log.errorOf());

    // Step 5: Log output to stdout.
    Images.displayOf(result.getResponses()).apply("logResponses", Log.infoOf());
  }

The pipeline output, shown below, displays a summary of the downloaded image, its URL, mimetype and size.

KV{https://storage.googleapis.com/generativeai-downloads/images/factory.png, mimeType=image/png, size=23130}
KV{https://storage.googleapis.com/generativeai-downloads/images/scones.jpg, mimeType=image/jpeg, size=394671}
KV{https://storage.googleapis.com/generativeai-downloads/images/cake.jpg, mimeType=image/jpeg, size=253809}
KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, mimeType=image/png, size=29375}
KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg, mimeType=image/jpeg, size=207281}
KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, mimeType=image/jpeg, size=1121752}

Using API client code

The last example demonstrated invoking HTTP requests directly. However, there are some API services that provide client code that one should use within the Caller implementation. Using client code within Beam presents unique challenges, namely serialization. Additionally, some client code requires explicit handling in terms of setup and teardown.

RequestResponseIO can handle an additional interface called SetupTeardown for these scenarios.

The SetupTeardown interface has only two methods, setup and teardown.

interface SetupTeardown {
    void setup() throws UserCodeExecutionException;
    void teardown() throws UserCodeExecutionException;
}

The transform calls these setup and teardown methods within its DoFn’s @Setup and @Teardown, methods respectively.

The transform also handles retries with backoff, likewise dependent on the thrown Exception, as discussed previously in this guide.

Define Caller with SetupTeardown

Below is an example that adapts Vertex AI Gemini Java Client to work in a Beam pipeline using RequestResponseIO, adding usage of the SetupTeardown interface, in addition to the required Caller. It has a bit more boilerplate than the simple HTTP example above.

Abbreviated snippet

An abbreviated snippet showing the important parts is shown below.

The setup method is where the GeminiAIClient instantiates VertexAI and GenerativeModel, finally closing VertexAI during teardown. Finally, its call method looks similar to the HTTP example above, where it takes a request, uses it to invoke an API, and returns the response.

class GeminiAIClient implements
    Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
    SetupTeardown {

    @Override
    public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
    throws UserCodeExecutionException {
        GenerateContentResponse response = client.generateContent(request.getContentsList());
        return KV.of(requestKV.getKey(), response);
    }

    @Override
    public void setup() throws UserCodeExecutionException {
        vertexAI = new VertexAI(getProjectId(), getLocation());
        client = new GenerativeModel(getModelName(), vertexAI);
    }

    @Override
    public void teardown() throws UserCodeExecutionException {
        vertexAI.close();
    }
}
Full example

Below shows the full example. Key to this example is that com.google.cloud.vertexai.VertexAI and com.google.cloud.vertexai.generativeai.GenerativeModel are not serializable and therefore need to be instantiated with transient. You can ignore @MonotonicNonNull if your java project does not use the https://checkerframework.org/.

/**
 * Example {@link Caller} and {@link SetupTeardown} implementation for use with {@link
 * RequestResponseIO} to process Gemini AI {@link GenerateContentRequest}s into {@link
 * GenerateContentResponse}s.
 */
@AutoValue
abstract class GeminiAIClient
    implements Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
        SetupTeardown {

  static Builder builder() {
    return new AutoValue_GeminiAIClient.Builder();
  }

  static final String MODEL_GEMINI_PRO = "gemini-pro";
  static final String MODEL_GEMINI_PRO_VISION = "gemini-pro-vision";

  private transient @MonotonicNonNull VertexAI vertexAI;
  private transient @MonotonicNonNull GenerativeModel client;

  @Override
  public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
      throws UserCodeExecutionException {

    String key = requestKV.getKey();
    GenerateContentRequest request = requestKV.getValue();

    if (request == null) {
      throw new UserCodeExecutionException("request is empty");
    }

    if (request.getContentsList().isEmpty()) {
      throw new UserCodeExecutionException("contentsList is empty");
    }

    try {

      GenerateContentResponse response =
          checkStateNotNull(client).generateContent(request.getContentsList());

      return KV.of(key, response);

    } catch (IOException e) {
      throw new UserCodeExecutionException(e);
    }
  }

  @Override
  public void setup() throws UserCodeExecutionException {
    vertexAI = new VertexAI(getProjectId(), getLocation());
    client = new GenerativeModel(getModelName(), vertexAI);
  }

  @Override
  public void teardown() throws UserCodeExecutionException {
    if (vertexAI != null) {
      vertexAI.close();
    }
  }

  abstract String getModelName();

  abstract String getProjectId();

  abstract String getLocation();

  @AutoValue.Builder
  abstract static class Builder {

    abstract Builder setModelName(String name);

    abstract Optional<String> getModelName();

    abstract Builder setProjectId(String value);

    abstract Builder setLocation(String value);

    abstract GeminiAIClient autoBuild();

    final GeminiAIClient build() {
      if (!getModelName().isPresent()) {
        setModelName(MODEL_GEMINI_PRO);
      }
      return autoBuild();
    }
  }

Ask Gemini AI to identify the image

Now let’s combine the previous example of acquiring an image to this Gemini AI client to ask it to identify the image.

Below is what we saw previously but encapsulated in a convenience method. It takes a List of urls, and returns a PCollection of ImageResponses containing the image data.

  /**
   * Processes a list of raw image URLs into a {@link ImageResponse} {@link PCollection} using
   * {@link RequestResponseIO}. The resulting {@link KV#getKey} is the original image URL.
   */
  static Result<KV<String, ImageResponse>> imagesOf(List<String> urls, Pipeline pipeline) {

    Coder<KV<String, ImageResponse>> kvCoder = KvCoder.of(STRING_CODER, ImageResponseCoder.of());

    return requestsOf(urls, pipeline)
        .apply(
            ImageResponse.class.getSimpleName(),
            RequestResponseIO.of(HttpImageClient.of(), kvCoder));
  }

Next we convert the ImageResponses into a PCollection of GenerateContentRequests.

    // PCollection<KV<Struct, ImageResponse>> imagesKV = ...

    return imagesKV
        .apply(
            stepName,
            MapElements.into(requestKVType)
                .via(
                    kv -> {
                      String key = kv.getKey();
                      ImageResponse safeResponse = checkStateNotNull(kv.getValue());
                      ByteString data = safeResponse.getData();
                      return buildAIRequest(key, prompt, data, safeResponse.getMimeType());
                    }))
        .setCoder(kvCoder);

Finally, we apply the PCollection of GenerateContentRequests to RequestResponseIO, instantiated using the GeminiAIClient, defined above. Notice instead of RequestResponseIO.of, we are using RequestResponseIO.ofCallerAndSetupTeardown. The ofCallerAndSetupTeardown method just tells the compiler that we are providing an implementation of both the Caller and SetupTeardown interfaces.

    //    PCollection<KV<Struct, GenerateContentRequest>> requestKV = ...
    //    GeminiAIClient client =
    //            GeminiAIClient.builder()
    //                    .setProjectId(options.getProject())
    //                    .setLocation(options.getLocation())
    //                    .setModelName(MODEL_GEMINI_PRO_VISION)
    //                    .build();

    return requestKV.apply(
        "Ask Gemini AI", RequestResponseIO.ofCallerAndSetupTeardown(client, responseCoder));

The full end-to-end pipeline is shown below.

  /** Demonstrates using Gemini AI to identify a images, acquired from their URLs. */
  static void whatIsThisImage(List<String> urls, GeminiAIOptions options) {
    //        GeminiAIOptions options = PipelineOptionsFactory.create().as(GeminiAIOptions.class);
    //        options.setLocation("us-central1");
    //        options.setProjectId("your-google-cloud-project-id");
    //
    //
    //        List<String> urls = ImmutableList.of(
    //                "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
    //        );

    // Step 1: Instantiate GeminiAIClient, the Caller and SetupTeardown implementation.
    GeminiAIClient client =
        GeminiAIClient.builder()
            .setProjectId(options.getProject())
            .setLocation(options.getLocation())
            .setModelName(MODEL_GEMINI_PRO_VISION)
            .build();

    Pipeline pipeline = Pipeline.create(options);

    // Step 2: Download the images from the list of urls.
    Result<KV<String, ImageResponse>> getImagesResult = Images.imagesOf(urls, pipeline);

    // Step 3: Log any image download errors.
    getImagesResult.getFailures().apply("Log get images errors", Log.errorOf());

    // Step 4: Build Gemini AI requests from the download image data with the prompt 'What is this
    // picture?'.
    PCollection<KV<String, GenerateContentRequest>> requests =
        buildAIRequests("Identify Image", "What is this picture?", getImagesResult.getResponses());

    // Step 5: Using RequestResponseIO, ask Gemini AI 'What is this picture?' for each downloaded
    // image.
    Result<KV<String, GenerateContentResponse>> responses = askAI(client, requests);

    // Step 6: Log any Gemini AI errors.
    responses.getFailures().apply("Log AI errors", Log.errorOf());

    // Step 7: Log the result of Gemini AI's image recognition.
    responses.getResponses().apply("Log AI answers", Log.infoOf());

    pipeline.run();
  }

Below shows an abbreviated output of running the full pipeline, where we see the result of Gemini AI identifying the images.

KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, candidates {
    content {
        role: "model"
        parts {
            text: " This is a picture of a chocolate bar."
    }
}

KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, candidates {
    content {
        role: "model"
        parts {
            text: " The picture is a dog walking application form. It has two sections, one for information
                    about the dog and one for information about the owner. The dog\'s name is Fido,
                    he is a Cavoodle, and he is black and tan. He is 3 years old and has a friendly
                    temperament. The owner\'s name is Mark, and his phone number is 0491570006. He would
                    like Fido to be walked once a week on Tuesdays and Thursdays in the morning."
        }
    }
}

KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg
    content {
        role: "model"
        parts {
            text: " The picture shows a basket of croissants. Croissants are a type of pastry that is made
                    from a yeast-based dough that is rolled and folded several times in the rising process.
                    The result is a light, flaky pastry that is often served with butter, jam, or chocolate.
                    Croissants are a popular breakfast food and can also be used as a dessert or snack."
        }
    }
}