/r/dataflow

Photograph via snooOG

All about Apache Beam and Google Cloud Dataflow

/r/dataflow

1,302 Subscribers

1

Transcribe the News in Parallel Data Pipelines with Python ​‪Apache Beam‬, ‪Hugging Face‬ and OpenAI‬ (follow up video deploying to Dataflow coming soon)

0 Comments
2024/10/17
14:49 UTC

2

Sql data transfer to bigquery using dataflow

How do I move data from SSMS to bigquery using data flow via the gcp console and NOT the command line? I already created tables using sql in bigquery that have similar schema to the SSMS tables I want to transfer. I chose the sql server to bigquery template on data flow. I am having trouble figuring out the JDBC connection string. Kinda new to this so any help would be appreciated.

11 Comments
2024/05/31
03:36 UTC

3

What is the easiest way to create pipelines programmatically using Python?

I've asked a question here about using the Dataflow REST API to create pipelines using Python, but the thought occurred to me that I may be thinking about the template/pipeline/job hierarchy the wrong way. So I'll frame my situation in another way:

I have created a pipeline using the Dataflow GUI using a Google-provided template (JDBC to BigQuery). How do I programmatically create other pipelines that are copies of this pipeline, but with a couple parameters changed (output table and so on)?

(I am not interested in learning writing a template from scratch using the Beam SDK, as the Google-provided template suits my needs perfectly (it's just copying data from A to B with no frills))

1 Comment
2023/01/08
01:06 UTC

3

Active / thread components?

In a lot of dataflow libraries and systems, the processing bricks/processes/whatever are really merely a process() function at its core, which are called like this process(data[]).

And data contains a dictionary of messages/packets grouped by incoming port name.

To me, this poses several problems:

  • This makes the building blocks essentially stateless. Leading to things like "check if all the input messages required to construct the output message are available on the input ports" otherwise return. Leading to blocking if the number of required input messages are bigger than the capacity of the input connection/buffer.

  • This also leads to the inability to have an "active" building block that could have its own timing, tick system, sleep for a while, connect to a server and listen for events to be injected into the processing graph.

How do you guys see this limitation? Or is it even a limitation? Or am I not getting the dataflow/FBP/etc. paradigm?

1 Comment
2022/12/14
20:53 UTC

2

EOD Event processing

My english is not that good, so I am sorry about typos.

I am working on a streaming solution using dataflow and beam that consumes messages from a Pub/Sub topic. A sender process publish messages to this topic.

A new requirement asks my Streaming program to send an email notification after all the messages for that day is processed. To make this possible, the Sender process sends an event with EOF flag that is true for the last message of that day.

I'm facing challenge because messages in Pub/Sub topic is not ordered, so sometimes I receive the message with EOD flag "true" even before the last message for that day is consumed.

How can we resolve this problem? Do I need to consider FIFO Pub/Sub topic so that messages published in Order is also received in similar order?

1 Comment
2022/12/03
18:05 UTC

2

estimating DataPlex/DataCatalog ballpark charges for > 100TB datasets?

I am trying to get a handle on what ballpark initial few months of charges for enabling services like Data Catalog, DataPlex, Data Fusion & DataPrep could be.

Most of our GCS & BQ datasets are < 100 GBs, but a few in the 20-200TB range & our largest approaching 400 TBs. Poorly planned BQ queries on that dataset have resulted in one off chargest in the $10,000s - something we do our best to avoid.

I am interested in the exploration, labeling and grouping features in DataPlex/DataCatalog, and no-code processing pipeline features in DataFusion & DataPrep. I know that DataPrep is billed separately, but my question is what can I reasonably expect for costs running over these datasets in Plex/Catalog/Fusion?

The pricing calculator offers Data Catalog estimates based on 1MM API calls / month, that seems like alot for one person esp at start/exploratory phase. Storage costs are based on metadata size, which is not listed in details. Whats ratio of logical size to metadata size roughly? 100:1?

Went ahead and used fairly liberal estimates on all potential services used as listed on DataPlex pricing page (Dataflow, Dataproc, BigQuery, Cloud Scheduler)....came out around $200....no bad.

So, I guess bottomline is I am looking to hear from some folks with firsthand experience. Been there, done that & pissed of finance team; or pushed it hard and never really seemed to get that high??

What's the word?

ps - I know there are cost control mechanisms..ya...ya, not trying to establish residence yet or recruit a small team into the effort. Just trying to check it out & avoid landmines.

2 Comments
2022/11/12
19:46 UTC

2

What does this error mean in dataflow? Query uses unsupported SQL features: Only support column reference or struct field access in conjunction clause

I am using dataflow, SQL workspace to build a pipeline which extracts data from bigquery. The dataflow SQL editor shows the SQL query is valid. However the dataflow job fails to complete and gives the error.

What does the error mean? What supports column reference or struct field access in conjunction clause?

Why does the query validate in the dataflow SQL editor but throw an error when the job runs?

Why does the query run OK in bigquery?

ERROR

Invalid/unsupported arguments for SQL job launch: Query uses unsupported SQL features: Only support column reference or struct field access in conjunction clause

SQL QUERY

SELECT
  DISTINCT title,
  url,date
  textbody,
  files.path AS filepath,
  o.text AS text
FROM
  bigquery.table.myproject.mydataset.mytable,
  UNNEST( files ) files
INNER JOIN
  bigquery.table.bigquery.table.myproject.mydataset.extractedtext AS o
ON
  files.path = SUBSTRING(o.uri,18)
WHERE
  files.extractedtext IS null
,
0 Comments
2022/10/29
19:47 UTC

2

Running a pipeline to wait for a message per day? Would it be too costly?

I know that probably I should try to use a scheduled job to trigger the pipeline, but I haven't found a good/updated tutorial on how to do it; I'm still figuring out how to convert my pipeline into a flex template.
Does anyone know if having the pipeline waiting for a message counts for charging? or would I be charged only when an actual run happens?

Thanks.

4 Comments
2022/10/26
17:45 UTC

1

DataFlow Custom Pipeline error

So I've got a very basic custom pipeline in python to test moving data from cloud storage to BigQuery. The majority of it works fine if I just output a csv at the end, but using WriteToBigQuery is giving me errors.

From what I've seen, the syntax is: WriteToBigQuery(table = "{projectName}:{datasetName}.{tableName}", schema = ....) but when I try this I get:

TypeError*: isinstance() arg 2 must be a type, a tuple of types, or a union*

From it checking isinstance(table, TableReference) as part of WriteToBigQuery. I'm not really sure how else I should be inputting the table reference to avoid this.

Any help would be much appreciated!

1 Comment
2022/05/25
14:01 UTC

3

What is the difference between a Job and Pipeline in dataflow?

I cannot find what is the difference between the both, so I have a streaming Job should I be Importing is as a Pipeline. And what does that even do basically

1 Comment
2022/05/17
12:29 UTC

2

Dataflow Tutors

Hi I’m looking for Dataflow tutor with python as code base. Willing to pay.

0 Comments
2022/04/09
01:36 UTC

2

Apache beam Initializer

In my dataflow job, I need to initialize a Config factory and log certain messages in an audit log before actual processing begins.

I have placed the Config factory initialization code + audit logging in a parent class PlatformInitializer
and extending that in my Main Pipeline class.

public class CustomJob extends PlatformInitializer{
	private static final Logger LOG = LoggerFactory.getLogger(CustomJob.class);

	public static void main(String[] args) throws PropertyVetoException {
		CustomJob myCustomjob = new CustomJob();

		// Initialize config factories
		myCustomjob.initialize();

		// trigger dataflow job
		myCustomjob.parallelRead(args);
	}

As a result, I had to also implement Serializable interface in my Pipeline class because beam was throwing error - java.io.NotSerializableException: org.devoteam.CustomJob

Inside PlatformInitializer, I have an initilize() method that contains initialization logic for config factory and also log some initial audit messages.

public class PlatformInitializer {

public void initialize() {

  // Configfactory factory = new Configfactory()  

  // CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )  

}

}

My question is - is this right way to invoke some code that needs to be called before pipeline begins execution?

0 Comments
2022/04/02
03:36 UTC

2

Best way to structure a repo with multiple beam pipelines

Do you write a .py file fully encapsulating every pipeline standalone or do you make a base class that others inherit from and share functions/utils accross ?

Thank you !

2 Comments
2022/03/18
19:32 UTC

2

Dataflow experts please! Need some input here to go ahead

I am writing my first Apache beam based pipeline in python and want to run it using Dataflow Runner. In the Pcollection, i first read multiple XML files from GCS bucket(this folder path will be my input parameter) using apacahebeam.fileio. it returns a Pcollection ReadableFile object. Next step i need to use my python function defined which uses simple xmltodict module to parse XML file and return json data. I am facing issue to somehow pass this ReadableFile object to my function as file/string to parse and get the results in required format to then use WritetoText in next PTransform. I am referring to this I have done the same but here I want to use my defined python function and pass the fileio returned object as input to my function. This file path cannot be my input parameter. My input parameters will be 1. GCS path where I have folder containing multiple XML files. 2. GCS path when I want to write my json files.

Thanks in advance.

Waiting for any inputs/responses.

2 Comments
2022/02/24
12:10 UTC

1

Does JDBCIO support Transactions?

I was reading the Javadoc for Apache beam's JDBCIO https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/jdbc/JdbcIO.html

It does not say much about the Transaction support.

I have a Pipeline that process Orders coming from different Partners in a file. At the end of the processing, I need to update related DB tables. I am trying to update Order and Billing table (PostgresDB) at the end of the Dataflow job, and planning to use JdbcIO to update the DB tables directly. Since these tables have referential integrity, where Billing table has "ORDERID" as foreign key, I am looking for ways to update these two tables in a Transaction so that if update fails for any reason, I can roll back the transaction.

Wanted to ask, if you came across any details on how JdbcIO support Transaction?. Also, if you can share your experience in handling this kind of scenario from dataflow job, will be highly appreciated.

3 Comments
2022/02/16
14:00 UTC

2

Profiling Python Dataflow jobs

How can we profile dataflow jobs written using apache beam python sdk? I know about cloud profiler but I am not sure how it will be used for dataflow jobs? If there is any other service or product or framework I can work with to profile the dataflow job

4 Comments
2021/07/26
17:45 UTC

1

What does <project-id>/instanceGroupManagers/<dataflow_job_name> not found mean?

One of my dataflow jobs failed because of this error:

Workflow failed. Causes: Error: Message: The resource 'projects/<project-id>/zones/<region-name>/instanceGroupManagers/<job-name>-07170345-ay75-harness' was not found HTTP Code: 404

Do you know what it means?

1 Comment
2021/07/19
15:18 UTC

7

How do you handle deadlock when using Dataflow/Beam for ETL jobs into relational DBs?

Hi all,

I'm running into deadlocks when using a job to load data into a CloudSQL (MySQL) database. Currently, we're writing to the database in batches, and we've written some logic to get these batched writes to retry later if they run into a deadlock (within a certain number of retries). The database's isolation level is already set to repeatable read (the least strict isolation).

I have two questions:

  1. Where exactly should deadlocks be handled? I thought databases were supposed to handle deadlocks (e.g. aborting and queueing deadlocked transactions and running them after locked resources get released), but it seems like we don't have a choice but to handle deadlocks with our Beam pipeline. Should I consider this more of a database issue or a pipeline issue? There's also the connector between Beam and CloudSQL- is there anything in there that might help us handle deadlocks?

  2. Are there any best practices around dealing with deadlocks when using Beam for ETL? We're not really working with a huge data set or anything. While it's not surprising that writing to tables is a bottleneck, the deadlocks are rather unexpected.

Thanks in advance!

1 Comment
2021/07/06
20:39 UTC

3

How to connect Dataflow with on-prem app.

I am trying to establish connection between dataflow and on-prem application over HTTPS. How can I that?

4 Comments
2021/06/15
02:56 UTC

4

Custom template dead letters

Does anybody used Dataflow to stream JSON messages from pubsub to BigQuery using a custom template? What do you do with run time problems (the message is not well formatted for example, or have a missing key) . According to the Google cloud example code they send it to BigQuery in an Error table. I would prefer to send them to pubsub using the pubsub's dead letter feature. Is that possible? or I should handle the errors myself and push them to a pubsub topic by my own?. Thanks in advance

3 Comments
2021/02/26
14:35 UTC

4

Counting Dead Letter Messages, Capturing Them, and then Alerting

I currently have some events coming into PubSub, my DataFlow code is processing them, detecting some errors, then putting the successful events into one BigQuery table and putting the errored messages into another BigQuery table.

The errored messages should be rare and I want an alert to fire whenever something is put in the error table.

Is there any easy way to setup an alert when I detect an error in Dataflow? I added a metric which increments when an error is detected but I can't setup the alerts to fire correctly (they only fire once on the first increment and never fire again.) Is there an aggregator and aligner which will trigger a conditional if the total count on a metric increases? Or is there a better way to trigger an alert on error (ideally, I'd want an alert to fire if the error count > 0 in some period, say 12 hours.)

Thanks in advance!

1 Comment
2020/11/13
05:40 UTC

6

Creating pipeline using Dataflow as runner with a walk through video

“DataPiepeline using Apache Beam and Google Cloud DataFlow as Runner and BigQuery as DataSink” by Shravan C https://link.medium.com/RuCCuVANmbb

0 Comments
2020/11/13
00:42 UTC

3

High wall time dataflow step

I have a dataflow streaming pipeline which one of it's steps have a high wall time. I need help to figure out what is the meaning of the wall time metric and how does it affect the thoughput of my pipeline. This process should be near realtime.

https://preview.redd.it/1tkmk5uygjq51.png?width=560&format=png&auto=webp&s=452e9f04edcdee00a1863425e5d005e837f10489

1 Comment
2020/10/01
19:59 UTC

1

ModuleNotFoundError on dataflow job created via CloudFunction

I have a problem. Through CloudFunction I create a dataflow job. I use Python. I have two files - main.py and second.py. In main.py I import second.py. When I create manually through gsutila everything is fine (from local files), but if I use CloudFunction - the job is created, but theres a errors:

ModuleNotFoundError: No module named 'second'

Any idea?

3 Comments
2020/09/30
07:51 UTC

3

Trigger a batch pipeline through pubsub

I have a pipeline that fetches data from 3rd party site through requests everytime it is triggered.

I want this pipeline to be triggered only when a certain event/webhook gets triggered.

How do I deploy a pipeline that has this feature ? The way I see it I don't really need a streaming pipeline as the pipeline will run only on particular events ( of low frequency ).

How do I go about this ? Thanks in advance

2 Comments
2020/07/25
12:25 UTC

Back To Top