AppsFlyer Data Locker Airflow pipeline use case 

Author: Omid Vahdaty 31.3.2020

The bussiness usecase:

We wanted the data from Apps flyer , called Data locker which essentially just an AWS s3 bucket. the idea was to sync the data s3 to our GCS , from there to BigQuery. you need a dedicated machine with strong network for RSYNC. (slow operation that may take even 40 min). we splitted each folder and synced it separately in parallel via Airflow.

Setup your authentication of gsutil to read from AWS s3

  1. Run in your GCE instance , seperate from airflow.
gsutil config -a

2. Go to GCP storage settings

https://console.cloud.google.com/projectselector2/storage/settings?supportedpurview=project

select your project

select “Interoperability”

Under user account HMAC – create key

Copy Access key /secret Key to “gsutil config -a” input when asked

this will create a boto file :

“/home/omid/.boto”

3. Configure s3 Access key secret key in the boto file (additional configurations) under [Credentials]

  aws_access_key_id= xxx
  aws_secret_access_key == yyy

4. You should now be able to run any copy/move/sync command with gsutil on your AWS s3 bucket.

Airflow example to thus this GSUTIL commands on a remote machine (not healthy to run it on the airflow machine). also committed in our GITHUB.

#this assume you have configured HMAC authentication via Boto3 to access AWS s3 via GSUTIL 
rsync_uninstalls_cmd=			'gcloud beta compute --project 	gap---all-sites-1245 ssh apps-flyer	--internal-ip --zone us-central1-c --command "sudo -u omid gsutil -m rsync  -r -x \".*_SUCCESS.*$\"  s3://af-ext-reports/6abc-acc-SuFd4CoB/data-locker-hourly/t=uninstalls/  gs://data_lake_ingestion_us/apps_flyer/t=uninstalls/"'
rsync_installs_cmd=  			'gcloud beta compute --project 	gap---all-sites-1245 ssh apps-flyer	--internal-ip --zone us-central1-c --command "sudo -u omid gsutil -m rsync  -r -x \".*_SUCCESS.*$\"  s3://af-ext-reports/6abc-acc-SuFd4CoB/data-locker-hourly/t=installs/  gs://data_lake_ingestion_us/apps_flyer/t=installs/"'
rsync_organic_uninstall_cmd=  	'gcloud beta compute --project 	gap---all-sites-1245 ssh apps-flyer	--internal-ip --zone us-central1-c --command "sudo -u omid gsutil -m rsync  -r -x \".*_SUCCESS.*$\"  s3://af-ext-reports/6abc-acc-SuFd4CoB/data-locker-hourly/t=organic_uninstalls/  gs://data_lake_ingestion_us/apps_flyer/t=organic_uninstalls/"'
with models.DAG(
        'apps_flyer_sync_data_locker',
        # Continue to run DAG once per day
        schedule_interval='@hourly',
        default_args=default_dag_args) as dag:
	#dummy - proceed only if success
	start = DummyOperator(task_id='start')
	end = DummyOperator(task_id='end')
	
	rsync_uninstalls 		= BashOperator( task_id='rsync_uninstalls',	bash_command=rsync_uninstalls_cmd,dag=dag)
	rsync_installs 			= BashOperator( task_id='rsync_installs',	bash_command=rsync_installs_cmd,dag=dag)
	rsync_organic_uninstall = BashOperator( task_id='organic_uninstall',bash_command=rsync_organic_uninstall_cmd,dag=dag)
start >> rsync_uninstalls 			>> end
start >> rsync_installs 			>> end
start >> rsync_organic_uninstall 	>> end


——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

Leave a Reply