dataflow pipeline design for real time aggregation analysis
i have a case as below:
1) use pubsub as input in dataflow and load the stream data to bigquery 2) select aggregated result from bigquery and load to pubsub as output 3) client that listen to pubsub for display
e.g. i have sales transaction and want to see regional (aggregated) sales figure real-time. i knew that i can use 2 pipelines for load data to bigquery (1) and other dataflow pipeline to get aggregated result and push to pubsub.
Is there any way to do in a single pipeline? as i don't want to build a orchestration layer (i.e. after 1st pipeline finished, call 2 pipeline). and initialing pipeline is costly.
See also questions close to this topic
Beam and Datafow - How to do GroupByKey and sort faster?
I have over 100GB bounded data to process. The goal is to max the throughput. Basically, I need to segment data into groups, do sorting and then some ParDo work. The fowling code snippet shows how I did the session window and then do GroupByKey and Sort. I found GroupByKey is the bottleneck. By reading this blog , I understand that by doing some partial combination can significantly reduce data shuffle. However in my case, because I'm doing a sorting after GroupByKey, I guess data shuffle is going to be over 100GB anyways. So the question are:
- is there other ways that can increase GroupByKey throughput for my case?
One workaround I can think of is that I can compose a query in BigQuery to do kind of the same thing (i.e. segment by data's time gps, group and sorting), and then just leave reset ParDos to dataflow. So that there's no groupby needed. But session windows is just so smart and the save me a lot of code that I really try to avoid do it "manually" by writing query in GBQ.
PCollection<KV<String,TableRow>> sessionWindowedPairs = rowsKeyedByHardwareId .apply("Assign Rows Keyed by HardwareId into Session Windows" , Window.into(Sessions.withGapDuration(Duration.standardSeconds(200)))) ; PCollection<KV<String, List<TableRow>>> sortedGPSPerDevice = sessionWindowedPairs .apply("Group By HardwareId (and Window)", GroupByKey.create()) .apply("Sort GPSs of Each Group By DateTime", ParDo.of(new SortTableRowDoFn()));
Google Dataflow - Bigquery to TFRecords
I would like to use Apache Beam (Google Dataflow as the runner) to convert data from Bigquery to TFRecords.
Here is a script to illustrate some of the details:
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import GoogleCloudOptions options = PipelineOptions() google_cloud_options = options.view_as(GoogleCloudOptions) google_cloud_options.project = 'my-project' google_cloud_options.job_name = 'my_job_001' google_cloud_options.staging_location = 'gs://my-bucket/staging' google_cloud_options.temp_location = 'gs://my-bucket/temp' options.view_as(StandardOptions).runner = 'DataflowRunner' destination = 'gs://my-bucket/result' p = beam.Pipeline(options=options) # Read the text file[pattern] into a PCollection. lines = (p | 'QueryTableStdSQL' >> beam.io.Read(beam.io.BigQuerySource( query='SELECT id FROM `bigquery-public-data.hacker_news.comments` LIMIT 100', use_standard_sql=True)) | 'WriteExamples' >> beam.io.WriteToTFRecord(file_path_prefix=destination, file_name_suffix='.tfrecord') ) result = p.run()
I'm getting the folloing error:
I'm fairly new to Apache Beam and would appreciate any direction? Could it be related to setting up the correct coder?
Install Julia version >=0.5 Google Cloud Dataflow
I'm working on a project based in Google Cloud Dataflow. The idea is to be able to work with Julia programming language files, so I need to upload Julia Pacakge into the platform.
I have defined a setup.py file which allows to install Julia 0.4.7, nevertheless I need a more newer version (0.5) minimum.
['apt-get', '-y', 'install', 'julia']
I have tried to upgrade the Julia version in the setup.py file but nothing has worked. So I decided to use a work in the pipeline to install Julia.
s = Popen(['curl', '-s', 'https://julialang-s3.julialang.org/bin/linux/x64/0.6/julia-0.6.2-linux-x86_64.tar.gz'], stdin=PIPE, stdout=PIPE, bufsize=1) logging.info("***********************") logging.info('download ok') logging.info("***********************") s = Popen(['tar', '-xvf', 'julia-0.6.2-linux-x86_64.tar.gz'], stdin=PIPE, stdout=PIPE, bufsize=1) logging.info("***********************") logging.info('deco ok') logging.info("***********************") s = Popen(['ls','-R'], stdin=PIPE, stdout=PIPE, bufsize=1) logging.info("***********************") logging.info(s.stdout.read()) logging.info("***********************")
So, I have written this code which lloks for downloading Julia 0.6.2 package directly from its own web page, and the unzip this file. That works fine in the local, but once in the dataflow platflorm, it's not working. The commands are executed correctly, nonetheless, when I use "ls" to see the folders and packages existing in the machine I can't see Julia package nowhere.
Has anybody install packages by using the pipeline? Have you been able to upgrade a version throw the setup.py file (I know you can specify the version when you are doing a pip install, however it doesn't work when using apt-get install)
I've tried to look in internet, but there is not lots of informations about google cloud dataflow.
Thank you very much, any kind of help is welcome!
Google Cloud storage - delete processed files
Can you please help me here:
I'm batch processing files (json files) from Cloud Storage to write the data into BigQuery.
I have a topic created with a Cloud Function(to process the message and write the data into BQ) subscriber to the topic.
I have created a 'DataFlow' job to notify the topic for any json files created/stored in my source bucket.
The above flow processes the json file and inserts rows in to BQ table perfectly.
I want to delete the source json file from the Cloud Storage after the file is successfully processed. Any input on how this can be done?
Configuring Maximum rows per request in Dataflow Bigquery
I am using this template: https://cloud.google.com/dataflow/docs/templates/provided-templates#cloudpubsubtobigquery
Reading quota limits under Maximum rows per request here: https://cloud.google.com/bigquery/quotas#streaming_inserts
They recommend keeping max 500 rows per request.
Where can I configure
Maximum rows per requestin BigQuery sink? I've searched the whole documentation but did not find any relevant info.