October 20, 2017

Spark SQL Query Explain Plan

DSE 5.0.6, Spark 1.6.2

Example 1

CREATE TABLE avm.transactions ( 
	txn_status text, 
	txn_date date, 
	user_name text, 
	merchant_name text, 
	txn_amt int,  
	txn_id uuid,  
	txn_time timestamp, 
	PRIMARY KEY ((txn_status, txn_date), txn_time, txn_id) WITH CLUSTERING ORDER BY (txn_time DESC)  
)  

From cqlsh, below query works

cassandra@cqlsh:avm> select * from transactions where txn_status IN ('SUCCESS','FAILED') and txn_date IN 
('2017-07-16', '2017-07-17') limit 10;  

Below query does a Full Table Scan(FTS)

spark-sql>select * from transactions where txn_date IN (cast('2017-07-16' as date),cast('2017-07-17' as date))  
and event_name in ('SUCCESS','FAILED') AND MERCHANT_NAME='merchant1' limit 10  

Below query does not do FTS, because leading partition key column is txn_status

spark-sql>select * from transactions where txn_date IN (cast('2017-07-16' as date),cast('2017-07-17' as date)) and 
txn_status= 'SUCCESS' AND MERCHANT_NAME='merchant1' limit 10  

Below query does not return result at all if txn_date is not cast

spark-sql>select * from transactions where txn_date IN ('2017-07-16' '2017-07-17') and txn_status= 'SUCCESS' 
AND MERCHANT_NAME='merchant1' limit 10  

Example 2

CREATE TABLE avm.transactions_by_userid_month ( 
	txn_status text, 
	txn_date date, 
	user_name text,  
	userid text, 
	merchant_name text, 
	txn_amt int,  
	txn_id uuid,  
	txn_month int, 
	PRIMARY KEY ((userid, txn_month), txn_date,txn_id) WITH CLUSTERING ORDER BY (txn_date DESC)  
)  
spark-sql> explain select txn_amt from avm.transactions_by_userid_month where userid='11111' and txn_month=201711;  
== Physical Plan ==  
Project [txn_amt#58L]  
+- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@4ec1192f[txn_amt#58L] PushedFilters: [EqualTo(userid,11111), EqualTo(txn_month,201711)]  
Time taken: 0.413 seconds, Fetched 3 row(s)  

spark-sql> explain select txn_amt from avm.transactions_by_userid_month where userid='11111' and txn_month in (201711,201710);  
== Physical Plan ==  
Project [txn_am#58L]  
+- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@4ec1192f[txn_am#58L] PushedFilters: [EqualTo(userid,11111), In(txn_month,[Ljava.lang.Object;@344db28d)]  
Time taken: 0.685 seconds, Fetched 3 row(s)  

Example 3

CREATE TABLE avm.transactions ( 
	txn_status text, 
	txn_date date, 
	user_name text, 
	merchant_name text, 
	txn_amt int,  
	txn_id uuid,  
	txn_time timestamp, 
	PRIMARY KEY ((txn_status, txn_date), txn_time, txn_id) WITH CLUSTERING ORDER BY (txn_time DESC)  
)  

spark-sql> explain select user_name,merchant_name,txn_status,txn_amt from transactions where txn_date IN 
(cast('2017-06-30' as date),cast('2017-07-15' as date)) and txn_status= 'SUCCESS' and 
txn_time >= cast('2017-06-30' as timestamp) and txn_time <= cast('2017-07-15' as timestamp);  
== Physical Plan ==  
Project [user_name#12,merchant_name#13,txn_status#45,txn_amt#39]  
+- Filter ((txn_time#14 >= 1498780800000000) && (txn_time#14 <= 1500076800000000))  
 +- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3c12f87d[txn_time#14,txn_amt#39,user_name#12,merchant_name#13,txn_status#45] PushedFilters: [EqualTo(txn_status,SUCCESS), In(txn_date,[Ljava.lang.Object;@2de3ac17), GreaterThanOrEqual(txn_time,2017-06-30 00:00:00.0), LessThanOrEqual(txn_time,2017-07-15 00:00:00.0)]  
Time taken: 0.555 seconds, Fetched 4 row(s)  

spark-sql> explain select user_name,merchant_name,txn_status,txn_amt from transactions where txn_date =(cast('2017-06-30' as date)) and txn_status IN ('SUCCESS','FAILED') and txn_time >= cast('2017-06-30' as timestamp) and txn_time <= cast('2017-07-15' as timestamp);  
== Physical Plan ==  
Project [user_name#12,merchant_name#13,txn_status#45,txn_amt#39]  
+- Filter (((txn_status#10 IN (SUCCESS,FAILED) && (txn_date#11 = 1498780800000000)) && (txn_time#14 >= 1498780800000000)) && (txn_time#14 <= 1500076800000000))  
 +- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@3c12f87d[txn_status#10,txn_time#14,txn_amt#39,user_name#12,merchant_name#13,txn_status#45,txn_date#11] PushedFilters: [In(txn_status,[Ljava.lang.Object;@4f1cb802), EqualTo(txn_date,2017-06-30), GreaterThanOrEqual(txn_time,2017-06-30 00:00:00.0), LessThanOrEqual(txn_time,2017-07-15 00:00:00.0)]  

Example 4

spark-sql> explain select min(txn_time),max(txn_time) from transactions where txn_date ='2017-06-30' and txn_status='FAILED';  
== Physical Plan ==  
TungstenAggregate(key=[], functions=[(min(txn_time#38),mode=Final,isDistinct=false),(max(txn_time#38),mode=Final,isDistinct=false)], output=[_c0#45,_c1#46])  
+- TungstenExchange SinglePartition, None  
 +- TungstenAggregate(key=[], functions=[(min(txn_time#38),mode=Partial,isDistinct=false),(max(txn_time#38),mode=Partial,isDistinct=false)], output=[min#51,max#52])  
 +- Project [txn_time#38]  
 +- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@bf2f2ce[txn_time#38] PushedFilters: [EqualTo(txn_date,2017-06-30), EqualTo(txn_status,FAILED)]  
Time taken: 1.789 seconds, Fetched 6 row(s)  

spark-sql> explain select min(txn_time),max(txn_time) from transactions where txn_date =cast('2017-06-30' as date) and txn_status='FAILED';  
== Physical Plan ==  
TungstenAggregate(key=[], functions=[(min(txn_time#38),mode=Final,isDistinct=false),(max(txn_time#38),mode=Final,isDistinct=false)], output=[_c0#54,_c1#55])  
+- TungstenExchange SinglePartition, None  
 +- TungstenAggregate(key=[], functions=[(min(txn_time#38),mode=Partial,isDistinct=false),(max(txn_time#38),mode=Partial,isDistinct=false)], output=[min#60,max#61])  
 +- Project [txn_time#38]  
 +- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@bf2f2ce[txn_time#38] PushedFilters: [EqualTo(txn_date,2017-06-30), EqualTo(txn_status,FAILED)]  
Time taken: 1.379 seconds, Fetched 6 row(s)  

Example 5

CREATE TABLE avm.transactions_by_userid ( 
	txn_status text, 
	txn_date date, 
	user_name text, 
	merchant_name text, 
	txn_amt int,  
	txn_id uuid,  
	txn_time timestamp, 
	PRIMARY KEY (userid, txn_time, txn_id) WITH CLUSTERING ORDER BY (txn_time DESC)  
)  

spark-sql> explain select userid,min(txn_time) first_txn_date from avm.transactions_by_userid group by userid;  
== Physical Plan ==  
TungstenAggregate(key=[userid#3], functions=[(min(txn_time#4),mode=Final,isDistinct=false)], output=[userid#3,first_txn_date#0])  
+- TungstenExchange hashpartitioning(userid#3,200), None  
 +- TungstenAggregate(key=[userid#3], functions=[(min(txn_time#4),mode=Partial,isDistinct=false)], output=[userid#3,min#47])  
 +- Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@6da76bb1[userid#3,txn_time#4]  
Time taken: 3.655 seconds, Fetched 5 row(s)  

CATEGORIES