Uncategorised

GCP Cost Reduction in a nutshell

GCP Cost Reduction in a nutshell

Author: Ilan Rosen

In this article we will cover key factors in reducing costs over BigQuery from writing queries properly and building data architecture to support one goal – reducing costs.

Queries in BQ Cost Money

Pricing in BQ is in a pay as you go model : $5 per 1TB scanned when executing your query

Rule of thumb – in most of the cases, if your query costs more than $1 you apparently doing something wrong.

Be in control of your Cost – This tool can help you dramatically with that :

Do’s and Don'ts

  1. Try to avoid using “Select *”,  
    This way you avoid paying for data scan of columns that you don’t really need in your query result.
  2. Using the “Limit” clause on a query will not affect the amount of data scanned.
  3. Use the “Table Preview” option in order to see few lines of the table (instead of using  limit) – This is Free
  4. There is also a more extreme protection level under the Query settings – Advanced options section :  “Maximum bytes billed”
    Using this will limit the bytes billed for this query. If this query will have bytes billed beyond this limit, the query will be failed (without incurring a charge).
  5.  use the default table expiration time to remove the data when it’s no longer needed. – This will save Storage Costs.
  6. Use streaming inserts only if your data must be immediately available to query from. In other words – Don’t use streaming inserts unless you have to ! It costs money.
  7. Quotas – You can define a hard limit of data scan of a daily level or even a project level – but not on a user level. That’s a good practice to set a daily budget that will keep you in the safe zone in terms of cost.

Performance (& Cost) Key Factors

  1. Partition your tables – the significantly big ones !
    This will improve both query performance and cost.
  2. There are several ways to partition tables the most common and most of the time also  most cost and performance effective is the  time-partitioned tables method.
  3. Denormalize when possible. – Bigquery by its nature is designed for one huge table. Implementing a normalized table relations will result with many joins and overhead in terms of performance and cost.
  4. Use “Order By” only in the outermost query or within window clauses (analytic functions). Push complex operations to the end of the query.
  5. Avoid self-joins. Use a window function instead.
  6. Same thing applies on Cross-Joins

Cost reducing Data Architecture Guidelines

  1. As mentioned at the top of the article – The key principle that guides us when designing a cost effective architecture is to reduce the amount of data scanned by the end user.
  2. In most cases the end user might be the analyst or a business user using a BI platform.
  3. We want to reduce the amount of data scanned both in terms of cost and lets not forget performance.
    You don’t want your CEO to wait for a dashboard more than a few seconds until it’s presented with fresh data.
  4.   The way of reducing the data is by a simple layering method that starts with the raw data – ingestion layer and ends with a very small portion of it designed and grouped specifically for the BI tool of your choice – The Presentation layer
  5. In between those layers you should do all the cleansing / transforming /joining /aggregating in order to support the top layer to be optimized for fast and cheap queries for your BI visualizations.
  6. We strongly recommend decoupling your compute and storage.
    A common solution to make sure  of that is by using Airflow to orchestrate all of your data pipelines.
  7. The data operations on each level can be executed by a tool of your choice. In order to simplify the process, a good practice is to start with implementing it solely with internal views.
  8. This is how it looks like eventually :

——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way.
If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

Uncategorised

Data Engineering Use Cases Demystified | Online meetup

Data Engineering Use Cases Demystified | Online meetup

What will you learn :

– we are going to cover the basics of creating an architecture, and then we will build our own architecture LIVE on a made up use cases.

– we are going to deal with the following questions:
Suggest a Data lake architecture – how would you do it?
Batch / Streaming?
What are the Orchestration challenges?
Which Technology stack to implement the above?
Are the existing data sources good enough to get started?
Is  your use case ROI positive?

Meetup video

Meetup slides:


——————————————————————————————————————————
I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

Register our meetup:

Subscribe our Youtube channel:

Uncategorised

Connecting to Facebook API via Airflow

Connecting to Facebook API via Airflow

This is a “cut the bullshit and give me what I need” blog .

Prerequisites – Connecting to Facebook API via Airflow

  1. you need permission from facebook business , usually on your own personal facebook user – you need admin!
  2. SDK link: https://developers.facebook.com/docs/marketing-api/sdks/
  3. I picked Facebook SDK – python API
  4. you need an app registered on https://developers.facebook.com/products/ – you need the marketing API selected.
  5. For security, it is recommended that you turn on ‘App Secret Proof for Server API calls’ in your app’s Settings->Advanced page.
  6. get access token from your app’s Settings->Advanced page.
  7. get app secret from app’s Settings->Basic page.
  8. ad account id – from facebook – Ad Account Setup
  9. Verified business account on facebook. verify it on your app.

test via:

curl -X GET "https://graph.facebook.com/oauth/access_token
  ?client_id={your-app-id}
  &client_secret={your-app-secret}
  &grant_type=client_credentials"

setup and environment – Connecting to Facebook API via Airflow

  1. install GCE mahine with debian
  2. install git, pip
  3. git clone the source code of pytho sdk
  4. pip install facebook_busines
pip install facebook_business

run example code:

import sys
sys.path.append('/opt/homebrew/lib/python2.7/site-packages') # Replace this with the place you installed facebookads using pip
sys.path.append('/opt/homebrew/lib/python2.7/site-packages/facebook_business-3.0.0-py2.7.egg-info') # same as above

from facebook_business.api import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount

my_app_id = 'your-app-id'
my_app_secret = 'your-appsecret'
my_access_token = 'your-page-access-token'
FacebookAdsApi.init(my_app_id, my_app_secret, my_access_token)
my_account = AdAccount('act_')
campaigns = my_account.get_campaigns()
print(campaigns)

Example 2:

"""
Created on Mon Apr 27 21:37:08 2020

@author: tomerb
"""
from facebook_business.adobjects.adset import AdSet
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.api import FacebookAdsApi


access_token = 'token'
app_secret = 'secret'
ad_account_id = 'act_123456789'
FacebookAdsApi.init(access_token=access_token,app_secret=app_secret)

fields = [
  'impressions','clicks','spend'
]
fields =  ['campaign_name', 'ad_name', 'impressions', 'inline_link_clicks', 'spend']
params = {
        
  'breakdown': 'publisher_platform',
}


print( AdSet(ad_account_id).get_insights(
  fields=fields,
  params=params,
))

Full python example to access facebook marketing api

https://github.com/omidvd79/Big_Data_Demystified/blob/master/facebook_api/marketing_api_example.py

Full python example to access facebook marketing api with group by country and date

https://github.com/omidvd79/Big_Data_Demystified/blob/master/facebook_api/facebook_api_example_with_args.py

Airflow dag: TBD

The facebook api documentation of available fields and params

https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0

https://developers.facebook.com/docs/marketing-api/insights

https://github.com/facebook/facebook-python-business-sdk/tree/master/examples

https://developers.facebook.com/docs/marketing-api/insights/breakdowns/

https://stackoverflow.com/questions/49658991/get-facebook-marketing-api-ads-insights-results-as-csv-or-json-format


——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with,

feel free to contact me via LinkedIn:

Uncategorised

AWS Athena & Presto Cheat sheet

This is a quick “Cut the bullshit and give me what I Need” blog.

Your biggest problem in AWS Athena – is how to create table 🙂

Create table with separator pipe separator

CREATE   EXTERNAL TABLE logs (
    id STRING,
    query STRING
)
    ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '|'
      ESCAPED BY '\\'
      LINES TERMINATED BY '\n'
    LOCATION 's3://myBucket/logs';

create table with CSV SERDE

CREATE EXTERNAL TABLE IF NOT EXISTS logs(
 `date` string,
 `query` string 
 )

 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
 LOCATION 's3://omidongage/logs'

Create table with partition and parquet

CREATE EXTERNAL TABLE users (
first string,
last string,
username string
)
PARTITIONED BY (id string)
STORED AS parquet
LOCATION 's3://bucket/folder/'

CTAS Examples by official Examples – VERY good read!

https://docs.aws.amazon.com/athena/latest/ug/ctas-examples.html

AWS Athena Documentation:

https://docs.aws.amazon.com/athena/latest/ug/create-table.html

https://docs.aws.amazon.com/athena/latest/ug/ctas.html


——————————————————————————————————————————

I put a lot of thoughts into these blogs, so I could share the information in a clear and useful way. If you have any comments, thoughts, questions, or you need someone to consult with, feel free to contact me:

https://www.linkedin.com/in/omid-vahdaty/

Uncategorised

AWS Redshift Cheat Sheet

Basics…

get list of databases

select oid as database_id,
       datname as database_name,
       datallowconn as allow_connect
from pg_database
order by oid;

connect to db

SELECT * FROM PG_TABLE_DEF WHERE  schemaname ='public' 

Cheat sheet for basic SQL operations on Redshift.

Create Schema

create  SCHEMA  test_schema

Create table

create table test_schema.users(
	userid integer not null distkey sortkey,
	username char(8),
	firstname varchar(30),
	lastname varchar(30),
	city varchar(30),
	state char(2),
	email varchar(100),
	phone char(14),

CTAS

create table event_backup as select * from event;

CTAS with distkey and sory key

create table myTable2
distkey (col1)
sortkey (col1,col3)
as
select *
from MyTable;

Insert into table from S3

COPY test_schema.users FROM 's3://ariel-s3-buket/tickitdb/allusers_pipe.txt' iam_role 'arn:aws:iam::527228915290:role/RedshiftAccessS3' delimiter '|' region 'us-east-1';

Case – If

select venue city,
case venuecity
when 'New York City'
then 'Big Apple' else 'other'
end 
from venue
order by venueid desc;

Casting

select cast(pricepaid as integer)
from sales where salesid=100;

Redshit create table with emphasis on performance

some good reads…. before i summerize it for you.

https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html

https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_examples.html

https://www.flydata.com/blog/amazon-redshift-distkey-and-sortkey/

use distsytle or distkey with sortkey – create table demystified

DISTSTYLE { AUTO | EVEN | KEY | ALL }

Keyword that defines the data distribution style for the whole table. Amazon Redshift distributes the rows of a table to the compute nodes according to the distribution style specified for the table. The default is AUTO. The distribution style that you select for tables affects the overall performance of your database.

create table venue(
venueid smallint not null,
venuename varchar(100),
venuecity varchar(30))
diststyle all;  

DISTKEY ( column_name )

Keyword that specifies that the column is the distribution key for the table. Only one column in a table can be the distribution key. You can use the DISTKEY keyword after a column name or as part of the table definition by using the DISTKEY (column_name) syntax. Either method has the same effect. For more information, see the DISTSTYLE parameter later in this topic. Notice this can cause skews in your cluster storage.

create example with distkey

create table sales(
salesid integer not null,
listid integer not null,
sellerid integer not null,
buyerid integer not null,
eventid integer not null encode mostly16,
dateid smallint not null,
qtysold smallint not null encode mostly8,
pricepaid decimal(8,2) encode delta32k,
commission decimal(8,2) encode delta32k,
saletime timestamp,
primary key(salesid),
foreign key(listid) references listing(listid),
foreign key(sellerid) references users(userid),
foreign key(buyerid) references users(userid),
foreign key(dateid) references date(dateid))
distkey(listid)
compound sortkey(listid,sellerid);

[ { COMPOUND | INTERLEAVED } ] SORTKEY ( column_name [,… ] )

Keyword that specifies that the column is the sort key for the table. When data is loaded into the table, the data is sorted by one or more columns that are designated as sort keys. You can use the SORTKEY keyword after a column name to specify a single-column sort key, or you can specify one or more columns as sort key columns for the table by using the SORTKEY (column_name [, …]) syntax. Only compound sort keys are created with this syntax.

If you don’t specify any sort keys, the table isn’t sorted. You can define a maximum of 400 SORTKEY columns per table.

create example with compound sortkey

create table sales(
salesid integer not null,
listid integer not null,
sellerid integer not null,
buyerid integer not null,
eventid integer not null encode mostly16,
dateid smallint not null,
qtysold smallint not null encode mostly8,
pricepaid decimal(8,2) encode delta32k,
commission decimal(8,2) encode delta32k,
saletime timestamp,
primary key(salesid),
foreign key(listid) references listing(listid),
foreign key(sellerid) references users(userid),
foreign key(buyerid) references users(userid),
foreign key(dateid) references date(dateid))
distkey(listid)
compound sortkey(listid,sellerid);

create example with interleaved sortkey

create table customer_interleaved (
  c_custkey     	integer        not null,
  c_name        	varchar(25)    not null,
  c_address     	varchar(25)    not null,
  c_city        	varchar(10)    not null,
  c_nation      	varchar(15)    not null,
  c_region      	varchar(12)    not null,
  c_phone       	varchar(15)    not null,
  c_mktsegment      varchar(10)    not null)
diststyle all
interleaved sortkey (c_custkey, c_city, c_mktsegment);  

more create examples that will impact your performance…

Before: simple example with sortkey and distkey

create table activity (
  id integer primary key,
  created_at_date date sortkey distkey,
  device varchar(30)
);

After: simple example with sortkey and distkey

create table activity (
  id integer primary key,
  created_at_date distkey,
  device varchar(30)
)
sortkey (created_at_date, device);

How to view the dist-key and sort key in table in AWS Redshift?

select * from   SVV_TABLE_INFO

Redshift Date Manipuation

#assuming epoch time 13 digits
date_add('ms', myEpocTimeStamp,'1970-01-01')AS session_datetime,
 # 2020-01-01 03:17:17
trunc (date_add('ms', myEpocTimeStamp,'1970-01-01')) as session_date, # 2020-01-01

Redshift specific syntax

Table information like sortkeys, unsorted percentage

SELECT * FROM svv_table_info;

Table sizes in GB

SELECT t.name, COUNT(tbl) / 1000.0 AS gb
FROM (
  SELECT DISTINCT datname, id, name
  FROM stv_tbl_perm
  JOIN pg_database ON pg_database.oid = db_id
) AS t
JOIN stv_blocklist ON tbl = t.id
GROUP BY t.name ORDER BY gb DESC;

Table column metadata

SELECT * FROM pg_table_def
WHERE schemaname = 'public'
AND tablename = ...;

Vacuum progress

SELECT * FROM svv_vacuum_progress;

Find tables that need vacuum or analyze

SELECT "database", "schema", "table", unsorted, stats_off
FROM svv_table_info
WHERE unsorted > 20
OR stats_off > 20

The size in MB of each column of each table (actually the number of blocks, but blocks are 1 MB)

SELECT
  TRIM(name) as table_name,
  TRIM(pg_attribute.attname) AS column_name,
  COUNT(1) AS size
FROM
  svv_diskusage JOIN pg_attribute ON
    svv_diskusage.col = pg_attribute.attnum-1 AND
    svv_diskusage.tbl = pg_attribute.attrelid
GROUP BY 1, 2
ORDER BY 1, 2;

List users and groups

SELECT * FROM pg_user;
SELECT * FROM pg_group;

list all databases

SELECT * FROM pg_database;

List the 100 last load errors

see http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html

SELECT *
FROM stl_load_errors
ORDER BY starttime DESC
LIMIT 100;

Get the full SQL, plus more query details from a query ID

WITH query_sql AS (
  SELECT
    query,
    LISTAGG(text) WITHIN GROUP (ORDER BY sequence) AS sql
  FROM stl_querytext
  GROUP BY 1
)
SELECT
  q.query,
  userid,
  xid,
  pid,
  starttime,
  endtime,
  DATEDIFF(milliseconds, starttime, endtime)/1000.0 AS duration,
  TRIM(database) AS database,
  (CASE aborted WHEN 1 THEN TRUE ELSE FALSE END) AS aborted,
  sql
FROM
  stl_query q JOIN query_sql qs ON (q.query = qs.query)
WHERE
  q.query = ...
ORDER BY starttime;

Show the most recently executed DDL statements

SELECT
  starttime,
  xid,
  LISTAGG(text) WITHIN GROUP (ORDER BY sequence) AS sql
FROM stl_ddltext
GROUP BY 1, 2
ORDER BY 1 DESC;

Query duration stats per database, user and query group; including the max, median, 99 percentile, etc.


WITH
durations1 AS (
  SELECT
    TRIM("database") AS db,
    TRIM(u.usename) AS "user",
    TRIM(label) AS query_group,
    DATE_TRUNC('day', starttime) AS day,
    -- total_queue_time/1000000.0 AS duration,
    -- total_exec_time/1000000.0 AS duration,
    (total_queue_time + total_exec_time)/1000000.0 AS duration
  FROM stl_query q, stl_wlm_query w, pg_user u
  WHERE q.query = w.query
    AND q.userid = u.usesysid
    AND aborted = 0
),
durations2 AS (
  SELECT
    db,
    "user",
    query_group,
    day,
    duration,
    PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY duration) OVER (PARTITION BY db, "user", query_group, day) AS median,
    PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY duration) OVER (PARTITION BY db, "user", query_group, day) AS p75,
    PERCENTILE_CONT(0.90) WITHIN GROUP (ORDER BY duration) OVER (PARTITION BY db, "user", query_group, day) AS p90,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration) OVER (PARTITION BY db, "user", query_group, day) AS p95,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY duration) OVER (PARTITION BY db, "user", query_group, day) AS p99,
    PERCENTILE_CONT(0.999) WITHIN GROUP (ORDER BY duration) OVER (PARTITION BY db, "user", query_group, day) AS p999
  FROM durations1
)
SELECT
  db,
  "user",
  query_group,
  day,
  MIN(duration) AS min,
  AVG(duration) AS avg,
  MAX(median) AS median,
  MAX(p75) AS p75,
  MAX(p90) AS p90,
  MAX(p95) AS p95,
  MAX(p99) AS p99,
  MAX(p999) AS p999,
  MAX(duration) AS max
FROM durations2
GROUP BY 1, 2, 3, 4
ORDER BY 1, 2, 3, 4;

Currently executing and recently executed queries with status, duration, database, etc.

SELECT
  r.pid,
  TRIM(status) AS status,
  TRIM(db_name) AS db,
  TRIM(user_name) AS "user",
  TRIM(label) AS query_group,
  r.starttime AS start_time,
  r.duration,
  r.query AS sql
FROM stv_recents r LEFT JOIN stv_inflight i ON r.pid = i.pid;

show remote host and port of running queries

SELECT
  recents.pid,
  TRIM(db_name) AS db,
  TRIM(user_name) AS "user",
  TRIM(label) AS query_group,
  recents.starttime AS start_time,
  recents.duration,
  recents.query AS sql,
  TRIM(remotehost) AS remote_host,
  TRIM(remoteport) AS remote_port
FROM stv_recents recents
LEFT JOIN stl_connection_log connections ON (recents.pid = connections.pid)
LEFT JOIN stv_inflight inflight ON recents.pid = inflight.pid
WHERE TRIM(status) = 'Running'
AND event = 'initiating session';

Show user permissions

WITH
  users AS (
    SELECT usename AS user_name FROM pg_user
  ),
  objects AS (
    SELECT
      schemaname AS schema_name,
      'table' AS object_type,
      tablename AS object_name,
      schemaname + '.' + tablename AS full_object_name
    FROM pg_tables
    WHERE schemaname NOT IN ('pg_internal')
    UNION
    SELECT
      schemaname AS schema_name,
      'view' AS object_type,
      viewname AS object_name,
      schemaname + '.' + viewname AS full_object_name
    FROM pg_views
    WHERE schemaname NOT IN ('pg_internal')
  )
SELECT
  schema_name,
  object_name,
  object_type,
  user_name,
  HAS_TABLE_PRIVILEGE(users.user_name, full_object_name, 'select') AS "select",
  HAS_TABLE_PRIVILEGE(users.user_name, full_object_name, 'insert') AS "insert",
  HAS_TABLE_PRIVILEGE(users.user_name, full_object_name, 'update') AS "update",
  HAS_TABLE_PRIVILEGE(users.user_name, full_object_name, 'delete') AS "delete",
  HAS_TABLE_PRIVILEGE(users.user_name, full_object_name, 'references') AS "references"
FROM users, objects
ORDER BY full_object_name;

Credit for this blog goes to To:

Omid Vahdaty, ilan Rosen, Ariel Yoef