AppsFlyer Data Locker Airflow pipeline use case
Author: Omid Vahdaty 31.3.2020
The bussiness usecase:
We wanted the data from Apps flyer , called Data locker which essentially just an AWS s3 bucket. the idea was to sync the data s3 to our GCS , from there to BigQuery. you need a dedicated machine with strong network for RSYNC. (slow operation that may take even 40 min). we splitted each folder and synced it separately in parallel via Airflow.
Setup your authentication of gsutil to read from AWS s3
- Run in your GCE instance , seperate from airflow.
gsutil config -a
2. Go to GCP storage settings
https://console.cloud.google.com/projectselector2/storage/settings?supportedpurview=project
select your project
select “Interoperability”
Under user account HMAC – create key
Copy Access key /secret Key to “gsutil config -a” input when asked
this will create a boto file :
“/home/omid/.boto”
3. Configure s3 Access key secret key in the boto file (additional configurations) under [Credentials]
aws_access_key_id= xxx aws_secret_access_key == yyy
4. You should now be able to run any copy/move/sync command with gsutil on your AWS s3 bucket.
Airflow example to thus this GSUTIL commands on a remote machine (not healthy to run it on the airflow machine). also committed in our GITHUB.
#this assume you have configured HMAC authentication via Boto3 to access AWS s3 via GSUTIL rsync_uninstalls_cmd= 'gcloud beta compute --project gap---all-sites-1245 ssh apps-flyer --internal-ip --zone us-central1-c --command "sudo -u omid gsutil -m rsync -r -x \".*_SUCCESS.*$\" s3://af-ext-reports/6abc-acc-SuFd4CoB/data-locker-hourly/t=uninstalls/ gs://data_lake_ingestion_us/apps_flyer/t=uninstalls/"' rsync_installs_cmd= 'gcloud beta compute --project gap---all-sites-1245 ssh apps-flyer --internal-ip --zone us-central1-c --command "sudo -u omid gsutil -m rsync -r -x \".*_SUCCESS.*$\" s3://af-ext-reports/6abc-acc-SuFd4CoB/data-locker-hourly/t=installs/ gs://data_lake_ingestion_us/apps_flyer/t=installs/"' rsync_organic_uninstall_cmd= 'gcloud beta compute --project gap---all-sites-1245 ssh apps-flyer --internal-ip --zone us-central1-c --command "sudo -u omid gsutil -m rsync -r -x \".*_SUCCESS.*$\" s3://af-ext-reports/6abc-acc-SuFd4CoB/data-locker-hourly/t=organic_uninstalls/ gs://data_lake_ingestion_us/apps_flyer/t=organic_uninstalls/"' with models.DAG( 'apps_flyer_sync_data_locker', # Continue to run DAG once per day schedule_interval='@hourly', default_args=default_dag_args) as dag: #dummy - proceed only if success start = DummyOperator(task_id='start') end = DummyOperator(task_id='end') rsync_uninstalls = BashOperator( task_id='rsync_uninstalls', bash_command=rsync_uninstalls_cmd,dag=dag) rsync_installs = BashOperator( task_id='rsync_installs', bash_command=rsync_installs_cmd,dag=dag) rsync_organic_uninstall = BashOperator( task_id='organic_uninstall',bash_command=rsync_organic_uninstall_cmd,dag=dag) start >> rsync_uninstalls >> end start >> rsync_installs >> end start >> rsync_organic_uninstall >> end
——————————————————————————————————————————
I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,
feel free to contact me via LinkedIn: