/r/dataflow
All about Apache Beam and Google Cloud Dataflow
/r/dataflow
How do I move data from SSMS to bigquery using data flow via the gcp console and NOT the command line? I already created tables using sql in bigquery that have similar schema to the SSMS tables I want to transfer. I chose the sql server to bigquery template on data flow. I am having trouble figuring out the JDBC connection string. Kinda new to this so any help would be appreciated.
I've asked a question here about using the Dataflow REST API to create pipelines using Python, but the thought occurred to me that I may be thinking about the template/pipeline/job hierarchy the wrong way. So I'll frame my situation in another way:
I have created a pipeline using the Dataflow GUI using a Google-provided template (JDBC to BigQuery). How do I programmatically create other pipelines that are copies of this pipeline, but with a couple parameters changed (output table and so on)?
(I am not interested in learning writing a template from scratch using the Beam SDK, as the Google-provided template suits my needs perfectly (it's just copying data from A to B with no frills))
In a lot of dataflow libraries and systems, the processing bricks/processes/whatever are really merely a process() function at its core, which are called like this process(data[]).
And data contains a dictionary of messages/packets grouped by incoming port name.
To me, this poses several problems:
This makes the building blocks essentially stateless. Leading to things like "check if all the input messages required to construct the output message are available on the input ports" otherwise return. Leading to blocking if the number of required input messages are bigger than the capacity of the input connection/buffer.
This also leads to the inability to have an "active" building block that could have its own timing, tick system, sleep for a while, connect to a server and listen for events to be injected into the processing graph.
How do you guys see this limitation? Or is it even a limitation? Or am I not getting the dataflow/FBP/etc. paradigm?
My english is not that good, so I am sorry about typos.
I am working on a streaming solution using dataflow and beam that consumes messages from a Pub/Sub topic. A sender process publish messages to this topic.
A new requirement asks my Streaming program to send an email notification after all the messages for that day is processed. To make this possible, the Sender process sends an event with EOF flag that is true for the last message of that day.
I'm facing challenge because messages in Pub/Sub topic is not ordered, so sometimes I receive the message with EOD flag "true" even before the last message for that day is consumed.
How can we resolve this problem? Do I need to consider FIFO Pub/Sub topic so that messages published in Order is also received in similar order?
I am trying to get a handle on what ballpark initial few months of charges for enabling services like Data Catalog, DataPlex, Data Fusion & DataPrep could be.
Most of our GCS & BQ datasets are < 100 GBs, but a few in the 20-200TB range & our largest approaching 400 TBs. Poorly planned BQ queries on that dataset have resulted in one off chargest in the $10,000s - something we do our best to avoid.
I am interested in the exploration, labeling and grouping features in DataPlex/DataCatalog, and no-code processing pipeline features in DataFusion & DataPrep. I know that DataPrep is billed separately, but my question is what can I reasonably expect for costs running over these datasets in Plex/Catalog/Fusion?
The pricing calculator offers Data Catalog estimates based on 1MM API calls / month, that seems like alot for one person esp at start/exploratory phase. Storage costs are based on metadata size, which is not listed in details. Whats ratio of logical size to metadata size roughly? 100:1?
Went ahead and used fairly liberal estimates on all potential services used as listed on DataPlex pricing page (Dataflow, Dataproc, BigQuery, Cloud Scheduler)....came out around $200....no bad.
So, I guess bottomline is I am looking to hear from some folks with firsthand experience. Been there, done that & pissed of finance team; or pushed it hard and never really seemed to get that high??
What's the word?
ps - I know there are cost control mechanisms..ya...ya, not trying to establish residence yet or recruit a small team into the effort. Just trying to check it out & avoid landmines.
I am using dataflow, SQL workspace to build a pipeline which extracts data from bigquery. The dataflow SQL editor shows the SQL query is valid. However the dataflow job fails to complete and gives the error.
What does the error mean? What supports column reference or struct field access in conjunction clause?
Why does the query validate in the dataflow SQL editor but throw an error when the job runs?
Why does the query run OK in bigquery?
ERROR
Invalid/unsupported arguments for SQL job launch: Query uses unsupported SQL features: Only support column reference or struct field access in conjunction clause
SQL QUERY
SELECT
DISTINCT title,
url,date
textbody,
files.path AS filepath,
o.text AS text
FROM
bigquery.table.myproject.mydataset.mytable,
UNNEST( files ) files
INNER JOIN
bigquery.table.bigquery.table.myproject.mydataset.extractedtext AS o
ON
files.path = SUBSTRING(o.uri,18)
WHERE
files.extractedtext IS null
,
I know that probably I should try to use a scheduled job to trigger the pipeline, but I haven't found a good/updated tutorial on how to do it; I'm still figuring out how to convert my pipeline into a flex template.
Does anyone know if having the pipeline waiting for a message counts for charging? or would I be charged only when an actual run happens?
Thanks.
So I've got a very basic custom pipeline in python to test moving data from cloud storage to BigQuery. The majority of it works fine if I just output a csv at the end, but using WriteToBigQuery is giving me errors.
From what I've seen, the syntax is: WriteToBigQuery(table = "{projectName}:{datasetName}.{tableName}", schema = ....) but when I try this I get:
TypeError*: isinstance() arg 2 must be a type, a tuple of types, or a union*
From it checking isinstance(table, TableReference) as part of WriteToBigQuery. I'm not really sure how else I should be inputting the table reference to avoid this.
Any help would be much appreciated!
I cannot find what is the difference between the both, so I have a streaming Job should I be Importing is as a Pipeline. And what does that even do basically
Hi I’m looking for Dataflow tutor with python as code base. Willing to pay.
In my dataflow job, I need to initialize a Config factory and log certain messages in an audit log before actual processing begins.
I have placed the Config factory initialization code + audit logging in a parent class PlatformInitializer
and extending that in my Main Pipeline class.
public class CustomJob extends PlatformInitializer{
private static final Logger LOG = LoggerFactory.getLogger(CustomJob.class);
public static void main(String[] args) throws PropertyVetoException {
CustomJob myCustomjob = new CustomJob();
// Initialize config factories
myCustomjob.initialize();
// trigger dataflow job
myCustomjob.parallelRead(args);
}
As a result, I had to also implement Serializable interface in my Pipeline class because beam was throwing error - java.io.NotSerializableException: org.devoteam.CustomJob
Inside PlatformInitializer, I have an initilize() method that contains initialization logic for config factory and also log some initial audit messages.
public class PlatformInitializer {
public void initialize() {
// Configfactory factory = new Configfactory() // CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
}
}
My question is - is this right way to invoke some code that needs to be called before pipeline begins execution?
Do you write a .py file fully encapsulating every pipeline standalone or do you make a base class that others inherit from and share functions/utils accross ?
Thank you !
I am writing my first Apache beam based pipeline in python and want to run it using Dataflow Runner. In the Pcollection, i first read multiple XML files from GCS bucket(this folder path will be my input parameter) using apacahebeam.fileio. it returns a Pcollection ReadableFile object. Next step i need to use my python function defined which uses simple xmltodict module to parse XML file and return json data. I am facing issue to somehow pass this ReadableFile object to my function as file/string to parse and get the results in required format to then use WritetoText in next PTransform. I am referring to this I have done the same but here I want to use my defined python function and pass the fileio returned object as input to my function. This file path cannot be my input parameter. My input parameters will be 1. GCS path where I have folder containing multiple XML files. 2. GCS path when I want to write my json files.
Thanks in advance.
Waiting for any inputs/responses.
I was reading the Javadoc for Apache beam's JDBCIO https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/jdbc/JdbcIO.html
It does not say much about the Transaction support.
I have a Pipeline that process Orders coming from different Partners in a file. At the end of the processing, I need to update related DB tables. I am trying to update Order and Billing table (PostgresDB) at the end of the Dataflow job, and planning to use JdbcIO to update the DB tables directly. Since these tables have referential integrity, where Billing table has "ORDERID" as foreign key, I am looking for ways to update these two tables in a Transaction so that if update fails for any reason, I can roll back the transaction.
Wanted to ask, if you came across any details on how JdbcIO support Transaction?. Also, if you can share your experience in handling this kind of scenario from dataflow job, will be highly appreciated.
How can we profile dataflow jobs written using apache beam python sdk? I know about cloud profiler but I am not sure how it will be used for dataflow jobs? If there is any other service or product or framework I can work with to profile the dataflow job
One of my dataflow jobs failed because of this error:
Workflow failed. Causes: Error: Message: The resource 'projects/<project-id>/zones/<region-name>/instanceGroupManagers/<job-name>-07170345-ay75-harness' was not found HTTP Code: 404
Do you know what it means?
Hi all,
I'm running into deadlocks when using a job to load data into a CloudSQL (MySQL) database. Currently, we're writing to the database in batches, and we've written some logic to get these batched writes to retry later if they run into a deadlock (within a certain number of retries). The database's isolation level is already set to repeatable read (the least strict isolation).
I have two questions:
Where exactly should deadlocks be handled? I thought databases were supposed to handle deadlocks (e.g. aborting and queueing deadlocked transactions and running them after locked resources get released), but it seems like we don't have a choice but to handle deadlocks with our Beam pipeline. Should I consider this more of a database issue or a pipeline issue? There's also the connector between Beam and CloudSQL- is there anything in there that might help us handle deadlocks?
Are there any best practices around dealing with deadlocks when using Beam for ETL? We're not really working with a huge data set or anything. While it's not surprising that writing to tables is a bottleneck, the deadlocks are rather unexpected.
Thanks in advance!
I am trying to establish connection between dataflow and on-prem application over HTTPS. How can I that?
Does anybody used Dataflow to stream JSON messages from pubsub to BigQuery using a custom template? What do you do with run time problems (the message is not well formatted for example, or have a missing key) . According to the Google cloud example code they send it to BigQuery in an Error table. I would prefer to send them to pubsub using the pubsub's dead letter feature. Is that possible? or I should handle the errors myself and push them to a pubsub topic by my own?. Thanks in advance
I currently have some events coming into PubSub, my DataFlow code is processing them, detecting some errors, then putting the successful events into one BigQuery table and putting the errored messages into another BigQuery table.
The errored messages should be rare and I want an alert to fire whenever something is put in the error table.
Is there any easy way to setup an alert when I detect an error in Dataflow? I added a metric which increments when an error is detected but I can't setup the alerts to fire correctly (they only fire once on the first increment and never fire again.) Is there an aggregator and aligner which will trigger a conditional if the total count on a metric increases? Or is there a better way to trigger an alert on error (ideally, I'd want an alert to fire if the error count > 0 in some period, say 12 hours.)
Thanks in advance!
“DataPiepeline using Apache Beam and Google Cloud DataFlow as Runner and BigQuery as DataSink” by Shravan C https://link.medium.com/RuCCuVANmbb
I have a dataflow streaming pipeline which one of it's steps have a high wall time. I need help to figure out what is the meaning of the wall time metric and how does it affect the thoughput of my pipeline. This process should be near realtime.
I have a problem. Through CloudFunction I create a dataflow job. I use Python. I have two files - main.py and second.py. In main.py I import second.py. When I create manually through gsutila everything is fine (from local files), but if I use CloudFunction - the job is created, but theres a errors:
ModuleNotFoundError: No module named 'second'
Any idea?
I have a pipeline that fetches data from 3rd party site through requests everytime it is triggered.
I want this pipeline to be triggered only when a certain event/webhook gets triggered.
How do I deploy a pipeline that has this feature ? The way I see it I don't really need a streaming pipeline as the pipeline will run only on particular events ( of low frequency ).
How do I go about this ? Thanks in advance