Google Search Console API via Airflow

Author: Omid Vahdaty 22.3.2020

  1. Start here, by google
  2.  Verified gmail account in google search console – (say your email).
  3. Get api credentials using the setup tool from the link above

Select your GCP project.


Which API are you using ?

Google Search Console API

Where you will be calling the api form?

application data

are you planning to use this API with App engine or compute engine?


My answer led me to an answer , that i don’t need special authentication – use your own Gmail credentials. I choose to OAuth client ID and created service account anyways. you need the Client ID and Client_SECRET for the python script.

Other UI and User data? –> service acount


Options on getting started on google search console api via python:

Once you get the list of websites, this means you passed authentication. If the script return no errors and no websites - try another gmail user.

Once you got through authentication issues, try the below manual for detailed data report options:

  • install the python package:

pip install --upgrade google-api-python-client

  • Open the samples repository and copy both client_secrets.json and to your local directory.
  • Edit client_secrets.json, which you copied earlier, replacing client_id andclient_secret with your own values from previous section
  • run

python 2015-05-01 2015-05-30


Notice if you used


The an authtication file was created :



Cavities and common errors while connecting to google search API console :

  1. If your reason is still not authorized – try another gmail user in chrome incognito mode.
  2. You may want to consider using the gmail user with property owner permissions.
  3. Pay attention to –noauth_local_webserver flag in, this might help in authentication errors. try using a chrome in cognito browser, login with the authorized user for this.
  4. Notice the url is accurate in the form of pr you will get authentication error.


Once you got everything up and running, you going to need dimensions an filter documentation :


Our python to get metrics for dimensions (country,device,page,query):



if you are using a domain property name instead of URL use the below:
sites that are of type “Domain” are now prefixed with “sc-domain:”


python 2020-02-01 2020-02-02

Notice 2: – when you want all the data (limitation of 25000 rows per call):

Notice 3: search console API limits per amount of query per day, TBD.

Notice 4: API keeps 16 months back.

Notice 5: Mozcast report to track changes of google algorithm.

Notice 6: The data brought from the API will be 100% accurate.


– Make sure to load all Rows (loop with startRow & rowLimit (25000) until no more rows) Important: Identify errors to avoid partial data loading
* Need to go over each search_type : image / video / web* Should be in the table schema as a field but need to call it separately the API

– Pay attention to Quota

– Pay attention to data aggregation method (page vs. property)
– Initial tables to use* Aggregated data by device, country – To get accurate high level numbers
* Detailed –  date, query, page, country, device – To drill down on specific queries/pages
I’m not specifying all fields

for explanation on quotas and limits in search console, read this blog.


Full Airflow DAG that helps calling search console api with concurrency, retry, and qoatas (also committed in out git):

import datetime
import os
import logging
from datetime import timedelta, date
from airflow import DAG
from airflow import models
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import gcs_to_bq
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import BashOperator
from airflow.contrib.operators import gcs_to_gcs
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.utils import trigger_rule
from import storage
#from airflow.utils import trigger_rule
yesterday = datetime.datetime.combine( - datetime.timedelta(1),
default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
from datetime import timedelta, date
def daterange(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield start_date + timedelta(n)
### start & end date = delta period.
## -3 days?
start_date = + datetime.timedelta(delta)
end_date =
bash_run_report_remotly_cmd='gcloud beta compute --project gap---all-sites-1245 ssh search-console	--internal-ip --zone us-central1-c --command "sudo -u omid python /home/omid/"'
### init variables
def get_alphanumeric_task_id(a_string):
	isalnum = a_string.isalnum()
	#print('Is String Alphanumeric :', isalnum)
	alphanumeric_filter = filter(str.isalnum, a_string)
	alphanumeric_string = "".join(alphanumeric_filter)
	#remove / from file path
	return alphanumeric_string.replace("/", "__").replace(".", "_")  
with models.DAG(
        # Continue to run DAG once per day
        default_args=default_dag_args) as dag:
	#dummy - proceed only if success
	start = DummyOperator(task_id='start')
	wait = DummyOperator(task_id='wait')
	end = DummyOperator(task_id='end')
	for single_date in daterange(start_date, end_date):
		day_after_single_date=single_date+ datetime.timedelta(days = 1)
		##notice trigger_rule="all_done"
		bash_run_report_remotly_cmd='gcloud beta compute --project 	gap---all-sites-1245 ssh search-console --internal-ip --zone us-central1-c --command "sudo -u omid python /home/omid/ '+temp_date+" "+day_after_single_date+'"'
		run_report_remotly = BashOperator(task_id='run_report_remotly_'+temp_date,retries=2,retry_delay=datetime.timedelta(minutes=15),retry_exponential_backoff=True,max_retry_delay=datetime.timedelta(hours=48),bash_command=bash_run_report_remotly_cmd,trigger_rule="all_done")
	mv_to_data_lake = BashOperator( task_id='mv_to_data_lake',bash_command='gcloud beta compute --project 	gap---all-sites-1245 ssh search-console --internal-ip --zone us-central1-c --command "sudo -u omid gsutil -m mv -r  /tmp/search* gs://data_lake_ingestion_us/search_console/"',dag=dag)
	load="""bq --location US load --source_format CSV --replace=true --skip_leading_rows 1 --allow_quoted_newlines DATA_LAKE_INGESTION_US.search_console_partition gs://data_lake_ingestion_us/search_console/*"""
	load_to_data_lake = BashOperator( task_id='load_to_data_lake',bash_command=load,dag=dag)
wait  >> mv_to_data_lake  >> load_to_data_lake >> end



I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

1 thought on “Google Search Console API via Airflow”

Leave a Reply