Big Query

BigQuery Error : UPDATE or DELETE statement over table would affect rows in the streaming buffer, which is not supported

in case you get this error, you are most likely tryto detelte data which falls in range of windo of time the streaming insert is using.

Streaming insert is a services that allow ingestion of events in real time to big query.

something it take upto 90 mins to ingest the data. thus…. try to delete data from further back in time for example

DELETE FROM `MyProject.MyDataset.MyTable` 
WHERE id LIKE '%BigDataDemystified%'


Airflow Performance tuning in 5 min

Motivation – if you are going to use dynamic operators , the default setting will now work. for that i suggest the below settings.

Notice this configuration as provided AS IS, we not guaranteed.

Usually the config is in ~/airflow/airflow.cfg

This was tested in DEV environment only. Be sure to understand what your are doing.

# in the pool. 0 indicates no limit. default is 5
sql_alchemy_pool_size = 0

# max_overflow can be set to -1 to indicate no overflow limit;
# no limit will be placed on the total number of concurrent connections.  	
sql_alchemy_max_overflow = -1

# the max number of task instances that should run simultaneously
# on this airflow installation	
parallelism = 64
# The number of task instances allowed to run concurrently by the scheduler
# i suggest double the defaults after installation.
dag_concurrency = 32

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 32

# How long before timing out a python file import
# default is 30, i suggest 3000 for dynamic dag operators.
dagbag_import_timeout = 3000

# How long before timing out a DagFileProcessor, which processes a dag file
# default is 50 , i suggest 5000
dag_file_processor_timeout = 5000

# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run. my default was 2. i suggest 4 times the #default.
max_threads = 8

#consider change the heart rate scheduling of scheduler
 If the last scheduler heartbeat happened more than scheduler_health_check_threshold
# ago (in seconds), scheduler is considered unhealthy.
# This is used by the health check in the "/health" endpoint
scheduler_health_check_threshold = 60


Google Search Console API Python Demystified

  1. get started here, by google

2. you a verified gmail account in google search console – (say your email).

3. get api credentials using the setup tool from the link above

Select your GCP project.


Which API are you using ?

Google Search Console API

Where you will be calling the api form?

application data

are you planning to use this API with App engine or compute engine?


My answer led me to an answer , that i don’t need special authentication – use your own Gmail credentials. I choose to OAuth client ID and created service account anyways. you need the Client ID and Client_SECRET for the python script.

Other UI and User data? –> service acount

Options on getting started on google search console api via python:

  • Notice if that fails you can always you data studio connector to Search Console, it works like a charm.

Once you get the list of websites, this means you passed authentication. If the script return no errors and now websites – try another gmail user.

Once you got through authentication issues, try the below manual for detailed data report options:

  • install the python package:
pip install --upgrade google-api-python-client

  • Open the samples repository and copy both client_secrets.json and to your local directory.
  • Edit client_secrets.json, which you copied earlier, replacing client_id andclient_secret with your own values from previous section
  • run
python 2015-05-01 2015-05-30

Notice if you used


The an authtication file was created :


Cavities and common errors while connecting to google search API console :

  1. if your reason is still not authorized – try another gmail user in chrome incognito mode.
  2. you may want to consider using the gmail user with property owner permissions.
  3. Pay attention to –noauth_local_webserver flag in, this might help in authentication errors. try using a chrome in cognito browser, login with the authorized user for this.
  4. Notice the url is accurate in the form of pr you will get authentication error.

Once you got everything up and running, you going to need dimensions an filter documentation :

Our python to get metrics for dimensions (country,device,page,query):


if you are using a domain property name instead of URL use the below:
sites that are of type “Domain” are now prefixed with “sc-domain:”


python 2020-02-01 2020-02-02

Notice 2: – when you want all the data (limitation of 25000 rows per call):

Notice 3: search console API limits per amount of query per day, TBD.

Notice 4: API keeps 16 months back.

Notice 5: Mozcast report to track changes of google algorithm.

Notice 6: The data brought from the API will be 100% accurate.

Data Engineering, GCP, python, windows

How to Install Python and PIP on Windows running on GCP ?

This is blog a “cut the bulltishit” and give me what i need to get started blog. end to end if this is your first time – 1 hour you are up and running.

The business use case: the data science team need a server with GUI to run python scripts daily and keep changing them manually until the get the POC results expected.

The technical steps to install GCE machine with windows OS and GUI

  1. Install GCE machine, like any other in GCE, but change the boot disk to run “windows server” and version should be: ” Windows server 2016 datacenter with desktop Experience” . In addition, allow access Scope “Allow full access to all cloud API’s
  2. you need to confirm RDP network access , port 3389
  3. press the down facing arrows on the right of the RDP button to set password for your users.
  4. press RDP on the GCE console, if you are using chrome, install the chrome RDP plugin, it will simply your RDP experience , use the password from “3” and no need for domain, Notice you can change the screen size in the plugin” options

Technical steps to install Python and pip on window server machine

  1. Disable IE Enhanced Security, I used this manual . basically : Server Manager –> Local Server –> IE Enhanced Security Configuration >> Off
  2. Installed python using this blog. don’t for get to right click and “run as administrator“. browse to : download latest version. customize the installation to ensure all components are installed including pip, and ADD to PATH.
  3. To ensure python is accessible everywhere = ensured the path is updated using this blog. sample path :”C:\Users\Username\AppData\Local\Programs\Python\Python37
  4. If you have not installed pip, Install pip using this blog, don’t forget to add it to the path.
  5. used this video to schedule python to run daily. make sure to mark in the properties, that the job should run even if the user is not logged in.

Further more, some official Google compute engined tutorials


Airflow Dynamic Operators used in google ad manager API data pipeline | GAM demystified

Following the previous blog of GAM demystified, we created a python script that connects to Google Ad Manager API and download a file based on metrics and dimensions. in this blog we will use that script , and wrap in a new airflow job.


How wrap python script with is own prerequisites of python packages?

I create a stand alone GCE machine , with all the python packages required for the GAM api and python 3. and i am going to use airflow to connect remotely via ssh and then running the script there. the instance should the cheapest possible.

How to connect to a remote machine in GCE via airflow?

I used bashOperator to ran the below gcloud utility to connect the remote machine, command:

bash_gsutil_mv_cmd='gcloud beta compute --project ynet-data-myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid ls /tmp/"'

How to make an airflow ETL job recurrent? and why?

I am going to use a file with naming convention of date in each python run of the script , i.e the start and end will be the same day, so the output of the script will be one day worth of day. Then I am going to run them all in parallel based on the amount of days i need to be overridden. say 3 days. before i do that, I need to ensure there some cleanup of older files, to avoid duplication in the data.

Also i am going to create special python function to loop over date ranges. so when i create the dynamic operators look, each operator will run on a different day, running the instance of the python with the respective date.

Airflow Dynamic Operator considerations You need to:

  1. Set the downstream of each operator. and dynamic task name per operator.
  2. Set the retries to 0, b/c sometimes cleanup of non existing files is ok. (think first run of airflow)
  3. Set the trigger_rule=”all_done”, because sometime API may fail, and it is ok.

Example of setting downstream operator dynamically (bash_cleanup is dynamically created operator with different task id) :


How to load GAM api files to BigQuery via airflow?

  1. one time load manually it into the BigQuery, “using automatically detect schema”, this will create a table which is not partitioned. (using a partition based on ingesting time is not a good option in this use case.)
  2. Then use this blog to create table as select in big query using “Dimension_DATE” as your partition.
AS SELECT * from `MyProject.DATA_LAKE_GOOGLE_US.example_report`

also you can add an optional operator to load the data in the end, notice the WRITE _disposition and create_disposition:

load_to_bq_from_gcs = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    		create_disposition='CREATE_NEVER', # b/c of the manual CTAS
    		write_disposition='WRITE_TRUNCATE', #overwrite?

The full example of Airflow DAG to run python scripts based on remote machine is below and also committed in out GIT.

def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)

### start & end date = delta period.
## -7 days?
## deleta must be -3 or lower (negative number), -3 to produce days of history from today including today.
start_date = + datetime.timedelta(delta)
end_date =

bash_run_report_remotly_cmd='gcloud beta compute --project myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid python3 /home/omid/gam_data_transfer/ --start 2020-02-27 --end 2020-02-27"'

bash_gsutil_mv_cmd='gcloud beta compute --project ynet-data-myProjectName ssh myInstanceNAme --internal-ip --zone us-central1-a --command "sudo -u omid gsutil -m mv /tmp/*report_example_using_service_account_with_date_range* gs://myBucket/google/gam/example_report"'

#the python creates a random temporary file name suffix, so we delete the files in teh bucket before we proceed 
bash_cleanup_cmd='gsutil rm gs://myBucket/google/gam/example_report/*report_example_using_service_account_with_date_range_2020-02-26*'

with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:

	start = DummyOperator(task_id='start')
	wait = DummyOperator(task_id='wait',trigger_rule="all_done")
	end = DummyOperator(task_id='end',trigger_rule="all_done")
	#notice if delta has to be negative, -3 or lower,  so will have some dates in date range - you wont have an operator.
	for single_date in daterange(start_date, end_date):

		##notice trigger_rule="all_done"
		bash_cleanup_cmd='gsutil rm gs://myBucket/google/gam/example_report/*report_example_using_service_account_with_date_range_'+temp_date+'*'
		bash_cleanup = BashOperator(task_id='bash_cleanup_'+temp_date,retries=0,bash_command=bash_cleanup_cmd,trigger_rule="all_done")
		##notice trigger_rule="all_done"
		bash_run_report_remotly_cmd='gcloud beta compute --project ynet-data-analytics ssh scheduler2 --internal-ip --zone us-central1-a --command "sudo -u omid python3 /home/omid/gam_data_transfer/ --start '+temp_date+" --end "+temp_date+'"'
		run_report_remotly = BashOperator(task_id='run_report_remotly_'+temp_date,retries=0,bash_command=bash_run_report_remotly_cmd,trigger_rule="all_done")


	##notice trigger_rule="all_done"
	run_gsutil_mv = BashOperator(task_id='bash_gsutil_mv_cmd',retries=0,bash_command=bash_gsutil_mv_cmd,trigger_rule="all_done")
wait >> run_gsutil_mv >> end