为用户需要连接到一个数据存储(内置的I/O连接器不支持)而提供的指南。

  1. A guide for users who need to connect to a data store that isnt supported by the Built-in I/O connectors

要连接到Beam现有的I/O连接器不支持的数据存储,您必须创建一个自定义I/O连接器。
连接器通常由source和sink组成。
所有的Beam source和sink都是复合变换;
但是,自定义I/O的实现取决于您的使用场景。

以下是一些建议的步骤:

  1. To connect to a data store that isnt supported by Beams existing I/O connectors, you must create a custom I/O connector.
  2. A connector usually consists of a source and a sink.
  3. All Beam sources and sinks are composite transforms;
  4. however, the implementation of your custom I/O depends on your use case.
  5. Here are the recommended steps to get started:
  • 阅读本文并选择您的实现。你可以通过电子邮件向Beam开发邮件列表发送任何问题。另外,您可以检查是否有其他人正在使用相同的I/O连接器。
  1. For bounded (batch) sources, there are currently two options for creating a Beam source:
  2. Use ParDo and GroupByKey.
  3. Use the Source interface and extend the BoundedSource abstract subclass.
  4. Read this overview and choose your implementation. You can email the Beam dev mailing list with any questions you might have. In addition, you can check if anyone else is working on the same I/O connector.
  1. If you plan to contribute your I/O connector to the Beam community, see the Apache Beam contribution guide.
  1. Read the PTransform style guide for additional style guide recommendations.

Sources

有界(批量)源,当前有两个创建Beam source的方式:

  1. For bounded (batch) sources, there are currently two options for creating a Beam source:
  1. 使用ParDo和GroupByKey。

    1. Use ParDo and GroupByKey.
  2. 使用Source接口并继承BoundedSource抽象子类。

    1. Use the Source interface and extend the BoundedSource abstract subclass.

推荐使用ParDo方式,因为实现Source接口的代码可能比较困难。
查看何时使用Source接口文档,以获得可能需要使用Source的一些情况的列表(例如动态rebalancing)。

  1. ParDo is the recommended option, as implementing a Source can be tricky. See When to use the Source interface for a list of some use cases where you might want to use a Source (such as dynamic work rebalancing).

对于无界(流)源,您必须使用Source接口并扩展UnboundedSource抽象子类(仅限Java)。UnboundedSource支持对流pipeline非常有用的特性,比如检查点。

  1. (Java only) For unbounded (streaming) sources, you must use the Source interface and extend the UnboundedSource abstract subclass. UnboundedSource supports features that are useful for streaming pipelines, such as checkpointing.

Splittable DoFn是一个新的源代码框架,目前正在开发中,它将取代其它开发有界和无界sources的方式。有关更多信息,请参见多sdk连接器工作的路线图。

  1. Splittable DoFn is a new sources framework that is under development and will replace the other options for developing bounded and unbounded sources. For more information, see the roadmap for multi-SDK connector efforts.

何时使用Source接口 When to use the Source interface

您不确定是否要使用源代码,可以通过电子邮件发送到Beam dev邮件列表,我们可以讨论您的案例的具体利弊。

  1. If you are not sure whether to use Source, feel free to email the Beam dev mailing list and we can discuss the specific pros and cons of your case.

在某些情况下,实现源代码可能是必要的,或者可以获得更好的性能:

  1. In some cases, implementing a Source might be necessary or result in better performance:
  • 无限源:ParDo不用于从无限源读取数据。ParDo不支持检查点或对流式数据源有用的de-duping机制。
    1. Unbounded sources: ParDo does not work for reading from unbounded sources. ParDo does not support checkpointing or mechanisms like de-duping that are useful for streaming data sources.
  • 进度和大小估计: ParDo不能向runners提供关于进度或正在读取的数据大小的提示。如果没有数据的大小或读操作的进度的估算,运行程序就无法猜测读取操作的大小。因此,如果runner尝试动态分配工作单元,它就不会知道您的管道可能需要多少工作单元。
    1. Progress and size estimation: ParDo cant provide hints to runners about progress or the size of data they are reading. Without size estimation of the data or progress on your read, the runner doesnt have any way to guess how large your read will be. Therefore, if the runner attempts to dynamically allocate workers, it does not have any clues as to how many workers you might need for your pipeline.
  • 动态work rebalancing: ParDo不支持动态work rebalancing,有些读取器使用动态work rebalancing来提高作业的处理速度。根据你的数据源,动态work rebalancing可能是不合理。
    1. Dynamic work rebalancing: ParDo does not support dynamic work rebalancing, which is used by some readers to improve the processing speed of jobs. Depending on your data source, dynamic work rebalancing might not be possible.
  • 拆分成runner推荐的特定大小的部分: 在执行初始拆分时,ParDo不会使用runner的desired_bundle_size配置。
    1. Splitting into parts of particular size recommended by the runner: ParDo does not receive desired_bundle_size as a hint from runners when performing initial splitting.

例如, 如果要从新的文件格式中读取,该格式每个文件包含多条记录,或者如果您想从支持按键排序的键值存储中读取。

  1. For example, if youd like to read from a new file format that contains many records per file, or if youd like to read from a key-value store that supports read operations in sorted key order.

Source lifecycle

下面是一个序列图,它显示了在执行IO的读转换期间源的生命周期。
这些注释为IO开发人员提供了有用的信息,如应用于对象的约束或流模式等特定情况。

  1. Here is a sequence diagram that shows the lifecycle of the Source during the execution of the Read transform of an IO. The comments give useful information to IO developers such as the constraints that apply to the objects or particular cases such as streaming mode.

生命周期图

Using ParDo and GroupByKey

对于可以并行读取数据的数据存储或文件类型,可以将该进程视为一个mini-pipeline。这通常包括两个步骤:

  1. For data stores or file types where the data can be read in parallel, you can think of the process as a mini-pipeline. This often consists of two steps:
  1. 将数据分割成多个部分并行读取
    1. Splitting the data into parts to be read in parallel
  2. 从这些部分读取
    1. Reading from each of those parts

每个步骤都是一个ParDo,中间有一个GroupByKey。GroupByKey是一个实现细节,但是对于大多数 runner 来说,GroupByKey允许runner在某些情况下使用不同数量的worker:

  1. Each of those steps will be a ParDo, with a GroupByKey in between. The GroupByKey is an implementation detail, but for most runners GroupByKey allows the runner to use different numbers of workers in some situations:
  • 确定将要读取的数据怎样分割成块
    1. Determining how to split up the data to be read into chunks
  • 读取数据,这往往受益于更多的worker
    1. Reading data, which often benefits from more workers

此外,GroupByKey还允许在支持该功能的runner上进行动态 work rebalancing。

  1. In addition, GroupByKey also allows dynamic work rebalancing to happen on runners that support the feature.

下面是一些读取转换实现的示例,当数据可以并行读取时,这些实现使用“作为mini-pipeline读取”模型:

  1. Here are some examples of read transform implementations that use the reading as a mini-pipeline model when data can be read in parallel:
  • 从文件通配符中读取:例如,读取“~/data/**”中的所有文件。

    1. Reading from a file glob: For example, reading all files in “~/data/**”.
    • 获取文件路径ParDo: 作为输入,获取一个文件通配符。生成一个字符串PCollection,每个字符串都是一个文件路径。
      1. Get File Paths ParDo: As input, take in a file glob. Produce a PCollection of strings, each of which is a file path.
    • 读取ParDo: 给定文件路径的PCollection,读取每个路径,生成一个记录的PCollection。
      1. Reading ParDo: Given the PCollection of file paths, read each one, producing a PCollection of records.
  • 从NoSQL数据库(如Apache HBase)读取: 这些数据库通常允许并行地从范围中读取数据。

    1. Reading from a NoSQL database (such as Apache HBase): These databases often allow reading from ranges in parallel.
    • 确定关键范围ParDo: 作为输入,接收数据库的连接信息和要读取的键范围。生成一个可以有效并行读取的键范围PCollection。
      1. Determine Key Ranges ParDo: As input, receive connection information for the database and the key range to read from. Produce a PCollection of key ranges that can be read in parallel efficiently.
    • 读取键范围ParDo:给定键范围的PCollection,读取键范围,产生一个记录的PCollection。
      1. Read Key Range ParDo: Given the PCollection of key ranges, read the key range, producing a PCollection of records.

对于无法并行读取的数据存储或文件,读取是一个简单的任务,可以使用一个ParDo+GroupByKey来完成。例如:

  1. For data stores or files where reading cannot occur in parallel, reading is a simple task that can be accomplished with a single ParDo+GroupByKey. For example:
  • 从数据库查询中读取:传统的SQL数据库查询通常只能按顺序读取。在这种情况下,ParDo将建立到数据库的连接并读取成批记录,生成这些记录的PCollection。
    1. Reading from a database query: Traditional SQL database queries often can only be read in sequence. In this case, the ParDo would establish a connection to the database and read batches of records, producing a PCollection of those records.
  • 从gzip文件中读取:必须按顺序读取gzip文件,因此不能并行读取。在这种情况下,ParDo将打开文件并按顺序读取,从文件中产生一个PCollection记录。
    1. Reading from a gzip file: A gzip file must be read in order, so the read cannot be parallelized. In this case, the ParDo would open the file and read in sequence, producing a PCollection of records from the file.

Sinks

要创建一个Beam sink,我们建议使用ParDo将接收到的记录写入数据存储。

要开发更复杂的Sink(例如,在runner失败重试时,支持数据de-duplication),可以使用ParDo、GroupByKey和其他可用的Beam转换。

  1. To create a Beam sink, we recommend that you use a ParDo that writes the received records to the data store. To develop more complex sinks (for example, to support data de-duplication when failures are retried by a runner), use ParDo, GroupByKey, and other available Beam transforms.

对于基于文件的接收,您可以使用由Java和Python sdk提供的FileBasedSink抽象类。详情请参阅我们的语言实现指南:

  1. For file-based sinks, you can use the FileBasedSink abstraction that is provided by both the Java and Python SDKs. See our language specific implementation guides for more details:
  • Developing I/O connectors for Java
  • Developing I/O connectors for Python

Developing I/O connectors for Java
Developing I/O connectors for Python

文档更新时间: 2020-01-16 08:25   作者:admin