5. Pipeline I/O




  1. When you create a pipeline, you often need to read data from some external source, such as a file or a database. Likewise, you may want your pipeline to output its result data to an external storage system.
  2. Beam provides read and write transforms for a number of common data storage types.
  3. If you want your pipeline to read from or write to a data storage format that isnt supported by the built-in transforms, you can implement your own read and write transforms.

5.1. 输入 Reading input data

  • 读取转换从外部源读取数据并返回数据的PCollection表示形式,供管道使用。
  • 在构建管道时,可以在任何时候使用读取转换来创建新的PCollection,尽管它在管道的开头最常见。
  1. Read transforms read data from an external source and return a PCollection representation of the data for use by your pipeline. You can use a read transform at any point while constructing your pipeline to create a new PCollection, though it will be most common at the start of your pipeline.
  1. PCollection<String> lines = p.apply(TextIO.read().from("hdfs://some/inputData.txt"));
  1. lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')

5.2. 输出 Writing output data

  • 写入转换将PCollection中的数据写入外部数据源。
  • 您通常会在管道的末尾使用写转换来输出管道的最终结果。
  • 但是,可以使用写转换在管道中的任何点输出PCollection的数据。
  1. Write transforms write the data in a PCollection to an external data source. You will most often use write transforms at the end of your pipeline to output your pipelines final results. However, you can use a write transform to output a PCollections data at any point in your pipeline.
  1. output.apply(TextIO.write().to("gs://some/outputData"));
  1. output | beam.io.WriteToText('hdfs://some/outputData')

5.3. 基于文件的输入与输出 File-based input and output data

5.3.1. 多文件输入 Reading from multiple locations

  • 许多读取转换支持从与您提供的通配符匹配的多个输入文件进行读取。
  • 请注意,通配符是特定于文件系统的,并且遵从特定于文件系统的一致性模型。
  • 以下TextIO示例,使用通配符(*)读取指定位置中具有所有匹配前缀“input-”和后缀“.csv”的文件:
  1. Many read transforms support reading from multiple input files matching a glob operator you provide. Note that glob operators are filesystem-specific and obey filesystem-specific consistency models. The following TextIO example uses a glob operator (*) to read all matching input files that have prefix input-“ and the suffix “.csv in the given location:
  1. p.apply("ReadFromText",
  2. TextIO.read().from("protocol://my_bucket/path/to/input-*.csv"));
  1. lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')


  1. To read data from disparate sources into a single PCollection, read each one independently and then use the Flatten transform to create a single PCollection.

5.3.2. Writing to multiple output files

  • 对于基于文件的输出,默认将数据写入多个输出文件。
  • 当您将输出文件名传递给写入转换时,该文件名将用作写入转换生成的所有输出文件的前缀。
  • 可以通过指定后缀将后缀附加到每个输出文件。
  1. For file-based output data, write transforms write to multiple output files by default. When you pass an output file name to a write transform, the file name is used as the prefix for all output files that the write transform produces. You can append a suffix to each output file by specifying a suffix.


  1. The following write transform example writes multiple output files to a location. Each file has the prefix numbers”, a numeric tag, and the suffix “.csv”.
  1. records.apply("WriteToText",
  2. TextIO.write().to("protocol://my_bucket/path/to/numbers")
  3. .withSuffix(".csv"));
  1. filtered_words | 'WriteToText' >> beam.io.WriteToText(
  2. '/path/to/numbers', file_name_suffix='.csv')

5.4. 内置输入输出转换 Beam-provided I/O transforms


  1. See the Beam-provided I/O Transforms page for a list of the currently available I/O transforms.
文档更新时间: 2020-01-15 06:41   作者:admin