Cdap IO
A CdapIO
is a transform for reading data from source or writing data to sink CDAP plugin.
Batch plugins support
CdapIO
currently supports the following CDAP Batch plugins by referencing CDAP plugin
class name:
- Hubspot Batch Source
- Hubspot Batch Sink
- Salesforce Batch Source
- Salesforce Batch Sink
- ServiceNow Batch Source
- Zendesk Batch Source
Also, any other CDAP Batch plugin based on Hadoop’s InputFormat
or OutputFormat
can be used. They can be easily added to the list of supported by class name plugins, for more details please see CdapIO readme.
Streaming plugins support
CdapIO
currently supports CDAP Streaming plugins based on Apache Spark Receiver.
Requirements for CDAP Streaming plugins:
- CDAP Streaming plugin should be based on
Spark Receiver
(Spark 2.4). - CDAP Streaming plugin should support work with offsets.
- Corresponding Spark Receiver should implement HasOffset interface.
- Records should have the numeric field that represents record offset.
Batch reading using CdapIO
In order to read from CDAP plugin you will need to pass:
Key
andValue
classes. You will need to check if these classes have a Beam Coder available.PluginConfig
object with parameters for certain CDAP plugin.
You can easily build PluginConfig
object using ConfigWrapper
class by specifying:
- Class of the needed
PluginConfig
. Map<String, Object>
parameters map for corresponding CDAP plugin.
For example:
Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();
Read data by plugin class name
Some CDAP plugins are already supported and can be used just by plugin class name.
For example:
Read data with building Batch Plugin
If CDAP plugin is not supported by plugin class name, you can easily build Plugin
object by passing the following parameters:
- Class of CDAP Batch plugin.
- The
InputFormat
class used to connect to your CDAP plugin of choice. - The
InputFormatProvider
class used to provideInputFormat
.
Then you will be able to pass this Plugin
object to CdapIO
.
For example:
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyInputFormat.class,
MyInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class);
p.apply("read", readTransform);
Examples for specific CDAP plugins
CDAP Hubspot Batch Source plugin
SourceHubspotConfig pluginConfig =
new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
CdapIO.<NullWritable, JsonElement>read()
.withCdapPluginClass(HubspotBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforce Batch Source plugin
SalesforceSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
CdapIO.<Schema, LinkedHashMap>read()
.withCdapPluginClass(SalesforceBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(Schema.class)
.withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);
CDAP ServiceNow Batch Source plugin
ServiceNowSourceConfig pluginConfig =
new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ServiceNowSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);
CDAP Zendesk Batch Source plugin
ZendeskBatchSourceConfig pluginConfig =
new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ZendeskBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);
To learn more please check out complete examples.
Batch writing using CdapIO
In order to write to CDAP plugin you will need to pass:
Key
andValue
classes. You will need to check if these classes have a Beam Coder available.locksDirPath
, which is locks directory path where locks will be stored. This parameter is needed for Hadoop External Synchronization (mechanism for acquiring locks related to the write job).PluginConfig
object with parameters for certain CDAP plugin.
You can easily build PluginConfig
object using ConfigWrapper
class by specifying:
- Class of the needed
PluginConfig
. Map<String, Object>
parameters map for corresponding CDAP plugin.
For example:
Write data by plugin class name
Some CDAP plugins are already supported and can be used just by plugin class name.
For example:
Write data with building Batch Plugin
If CDAP plugin is not supported by plugin class name, you can easily build Plugin
object by passing the following parameters:
- Class of CDAP plugin.
- The
OutputFormat
class used to connect to your CDAP plugin of choice. - The
OutputFormatProvider
class used to provideOutputFormat
.
Then you will be able to pass this Plugin
object to CdapIO
.
For example:
CdapIO.Write<String, String> writeTransform =
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyOutputFormat.class,
MyOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);
Examples for specific CDAP plugins
CDAP Hubspot Batch Sink plugin
SinkHubspotConfig pluginConfig =
new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
CdapIO.<NullWritable, String>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);
CDAP Salesforce Batch Sink plugin
SalesforceSinkConfig pluginConfig =
new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
CdapIO.<NullWritable, CSVRecord>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(CSVRecord.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);
To learn more please check out complete examples.
Streaming reading using CdapIO
In order to read from CDAP plugin you will need to pass:
Key
andValue
classes. You will need to check if these classes have a Beam Coder available.PluginConfig
object with parameters for certain CDAP plugin.
You can easily build PluginConfig
object using ConfigWrapper
class by specifying:
- Class of the needed
PluginConfig
. Map<String, Object>
parameters map for corresponding CDAP plugin.
For example:
Read data by plugin class name
Some CDAP plugins are already supported and can be used just by plugin class name.
For example:
Read data with building Streaming Plugin
If CDAP plugin is not supported by plugin class name, you can easily build Plugin
object by passing the following parameters:
- Class of CDAP Streaming plugin.
getOffsetFn
, which isSerializableFunction
that defines how to getLong
record offset from a record.receiverClass
, which is Spark (v 2.4)Receiver
class associated with CDAP plugin.- (Optionally)
getReceiverArgsFromConfigFn
, which isSerializableFunction
that defines how to get constructor arguments for SparkReceiver
usingPluginConfig
object.
Then you will be able to pass this Plugin
object to CdapIO
.
For example:
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
MyStreamingPlugin.class,
myGetOffsetFn,
MyReceiver.class,
myGetReceiverArgsFromConfigFn))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("read", readTransform);
Read data with optional parameters
Optionally you can pass the following optional parameters:
pullFrequencySec
, which is delay in seconds between polling for new records updates.startOffset
, which is inclusive start offset from which the reading should be started.
For example:
Examples for specific CDAP plugins
CDAP Hubspot Streaming Source plugin
HubspotStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
HubspotStreamingSource.class,
GetOffsetUtils.getOffsetFnForHubspot(),
HubspotReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforce Streaming Source plugin
SalesforceStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
SalesforceStreamingSource.class,
GetOffsetUtils.getOffsetFnForSalesforce(),
SalesforceReceiver.class,
config -> {
SalesforceStreamingSourceConfig salesforceConfig =
SalesforceStreamingSourceConfig) config;
return new Object[] {
salesforceConfig.getAuthenticatorCredentials(),
salesforceConfig.getPushTopicName()
};
}))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);
To learn more please check out complete examples.
Last updated on 2024/12/30
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!