Delta Live Tables || Error message meaning and resolution



Error:

RuntimeError: Query defined in function 'functionName' returned 'NoneType'. It must return either a Spark or Koalas DataFrame.


Resolution :-

Check the function/DLT table and verify if this delta table function/Table is returning some dataframe or not since every delta live table should return some dataframe

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Error

Could not materialize 'tableName' because a STREAMING_TABLE table already exists with that name.

Resolution :

As error message clearly indicating that there is some existing streaming table present in the system with the same name using which you are trying to create a materialized view.


----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Always remember when you are working with streaming dataset 

when dealing with streaming DataFrames in Spark, performing a Left Anti Join where the streaming DataFrame is on the right side of the join operation is not currently supported.


----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

com.databricks.pipelines.InvalidContextState: read(...) or readStream(...) must not be called outside of graph evaluation.


Resolution:-
you have to ceate a delta live table using @dlt.table as below and then use your code.

@dlt.table(name = "testing")
def testing_data()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


Difference between Delta table and Delta Live table

 


what's the difference between a Delta table and a Delta Live tables ?

----------------------------------------------------------------------------------------------------------

Delta table is a way to store data in tables, whereas Delta Live Tables allows you to describe how data flows between these tables declaratively. Delta Live Tables is a declarative framework that manages many delta tables, by creating them and keeping them up to date. In short, Delta tables is a data format while Delta Live Tables is a data pipeline framework (source: https://learn.microsoft.com/en-us/azure/databricks/introduction/delta-comparison )


How to Create a materialized views in Delta live table.

---------------------------------------------------------------------------------------------------------

you can use below code to create Materialized view in DLT

spark.table("db_name.schem_name.table_Name"):

OR

spark.createDataFrame([], schema=Schema)

Remember if you use dlt.read_stream it will only create Delta Live table or Streaming table


SPARK interview Q/A

Apache Spark Concepts

1. Immutability of RDDs/Dataframes in Apache Spark

Immutability ensures consistency and fault tolerance in distributed environments. It simplifies parallel processing and caching mechanisms, making Spark more resilient. It allows the ability to regenerate the previous steps of the DAG in case of failures, crucial in distributed environments with concurrent processes.

2. "count" in groupBy as Transformation vs. Action

Count acts as a transformation when used after groupBy, returning a DataFrame. When used directly, it acts as an action, returning data to the driver. This distinction is made based on the context of its usage.

3. Laziness of Transformations in Spark

Laziness in transformations allows Spark to optimize execution plans and minimize unnecessary computations. It enhances performance by executing only when necessary, providing optimized computation plans by the time an action is triggered.

4. Partition Skew in Apache Spark

Partition skew occurs when data distribution is significantly uneven across partitions. Mitigation involves techniques like repartitioning, using salting, or leveraging advanced partitioning strategies in Spark.

5. Normal Join vs. Broadcast Join

Normal join involves shuffling data across nodes, leading to potential overhead. Broadcast join optimizes the process by distributing a smaller dataset to each node, minimizing shuffling and improving efficiency.

6. Serialization Issue in Spark

Serialization in Spark converts objects to a format for storage or transmission. A serialization issue may arise in scenarios like inefficient shuffling. Solutions include using efficient serialization formats like Kryo and implementing the Serializable interface for custom classes.

7. Overview of Apache Spark Concepts

Various Apache Spark concepts include Resilient Distributed Dataset (RDD), DataFrame, Dataset, transformations, actions, directed acyclic graph (DAG), stages, tasks, cluster manager, driver program, executors, shuffling, Catalyst optimizer, distributed caching, broadcast variables, and more.

8. Cluster vs. Cluster Manager

In distributed computing, a cluster is a group of interconnected computers or nodes, while a cluster manager is software responsible for managing and coordinating resources across the nodes in a cluster.

9. Driver Memory Allocation in Spark

When submitting a Spark application, driver memory can be specified using properties like spark.driver.memory and spark.driver.memoryOverhead. Overhead memory is used for managing resources and Spark application state.

Spark Interview questions with answers




0. *What are deletion vectors?*

Deletion vectors are a storage optimization feature that can be enabled on Delta Lake tables. By default, when a single row in a data file is deleted, the entire Parquet file containing the record must be rewritten. With deletion vectors enabled for the table, DELETE and UPDATE operations use deletion vectors to mark existing rows as removed or changed without rewriting the Parquet file. Subsequent reads on the table resolve current table state by applying the deletions noted by deletion vectors to the most recent table version.

Databricks recommends using Databricks Runtime 14.1 and above to write tables with deletion vectors to leverage all optimizations. You can read tables with deletion vectors enabled in Databricks Runtime 12.1 and above.

*How to enable deletion vectors ?*

CREATE TABLE <table-name> [options] TBLPROPERTIES ('delta.enableDeletionVectors' = true);

ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);



1. why RDD's/Dataframes in Apache Spark are immutable. 

The creators of Spark could have made it mutable so that we could have kept making changes to same RDD/Dataframe?

Immutability ensures consistency and fault tolerance in distributed environments. It simplifies parallel processing and caching mechanisms, making Spark more resilient. 

To have the ability to regenerate the previous steps of DAG once we encounter failure also because spark is using in memory computation and it is distributed environment ,multiple process happen at the same time on the same data . 

 2. why "count" when used after groupby acts like a tranformation, and when used directly acts like an action. 

Why the creators of spark have given this functionality? 

Count doesn't return another dataframe when used directly, 

it returns data to the driver hence acts as action. but when used after group by, it returns a dataframe hence acts as transformation. 

 3. Ideally Laziness is not considered good, then why transformations in spark are Lazy and we are happy with that. what is the benefit we get?

Laziness in transformations allows Spark to optimize execution plans and minimize unnecessary computations. It enhances performance by executing only when necessary. Optimized computation because by the time action gets triggered, spark gets ready with best execution plan. 

 Also Spark Transformations are lazy because spark knows the bigger picture.Suppose if we have a dataframe and we r reading and later we r filtering the data frame on a condition 

So ideally due to its laziness it'll push the filter upwards (predicate pushdown) like this it'll optimize things .So spark is lazy so that we can get optimizations and efficient querry plan. 

 4. what is partition Skew and how do we deal with it in apache spark? 

Skewness here means data distribution is significantly uneven across available partitions(some having much data while some havng very less). 
As solution, we can use repartition/salting etc Partition skew occurs when data in partitions is unevenly distributed, impacting parallel processing. 
Mitigation involves repartitioning, using salting techniques, or leveraging advanced partitioning strategies in Spark. 

What is liquid clustering used for?

Databricks recommends liquid clustering for all new Delta tables. The following are examples of scenarios that benefit from clustering:

  • Tables often filtered by high cardinality columns.
  • Tables with significant skew in data distribution.
  • Tables that grow quickly and require maintenance and tuning effort.
  • Tables with concurrent write requirements.
  • Tables with access patterns that change over time.
  • Tables where a typical partition key could leave the table with too many or too few partition
  •  


5. difference between a Normal Join vs a Broadcast Join 

Let me try explaining this with a super simple example 

 Consider you started an EdTech company with 4 of your friends . All of you reside at different locations. Your EdTech offers 100's of courses Each course has a course id, course name, price, duration etc.. Let's say 1 lakh people purchased your courses All these 1 lakh sales transactions are present in one physical notebook. Consider each page of notebook is holding 1000 records. so total 100 pages are there. In this 100 page notebook you are maintaining below things.. student name, student email, course id etc.. Note: course name is not a part of the above Also, let's say you have one more small single sheet. This sheet maps all 100 course id with their course name. It acts like a lookup data (small table) Example: 1, bigdata 2, datascience 3, devops Now your end goal is to know which student purchased which course For this we need to join the sales data with one pager lookup data. Since you want to do it quickly, you want to divide this task among 5 of you. We have 100 pages of sales data and 5 persons, so you divided 20 pages to each person. but that lookup sheet is one single sheet which is only with you. since you have that lookup sheet you find it super easy to combine both the datasets and tell which person bought which course. However, the other 4 people do not have that lookup sheet, so they always need to consult you (shuffle activity) They keep bothering you and waste their time and your time (overhead involved). This is how a normal join would have worked. Now let's see whats the smarter way of doing it using broadcast join. What if all of you 5 somehow memorize that lookup sheet data in your brain? Since the sheet data has just 100 course details (small data) it is feasible for you all to retain that in your memory. This is how just by memorizing a few small lines (slight overhead), your team can do the task of joining in a very efficient manner as no interaction among 5 friends is required (no shuffle required). In this example treat each person like one machine/node. so we have 5 node cluster where each node holds.. 1. A part of big table (Sales table - 20k records) 2. The complete small table (lookup data) in memory. This way a broadcast join can be performed provided we have a large table and a small table. Small table will be broadcasted so that no shuffling will be required. Remember one thing.. Shuffling is a very costly operation and we should always have an intent to minimize or avoid it whenever possible. Did you like this explanation? what more topics you want me to cover, do mention in comments! Normalization vs Denormalization Normalization is a process of dividing the data into multiple smaller tables with an intent to reduce data redundancy & inconsistency. However, Denormalization is totally opposite of above idea. Denormalization is the technique of combining the data into a big single table. This definitely leads to redundancy in the data. Note: Redundancy causes inconsistency - Consider that same data is repeated 2 times and when updating you update at one place and forget to do at second place. This leads to inconsistent state. When retrieving data from Normalized tables we need to read many tables and perform join which is a costly operation. However, when reading the data from denormalized tables it's quite fast as no joins are required. When to use Normalized tables vs Denormalized ones? when we talk about OLTP systems (Online Transaction Processing) where we deal with lot of insert, delete and updates then you should go for Normalized tables. However, when you talk about OLAP (Online Analytical Processing) systems where you need to analyse historical data then Denormalized tables are best fit. Since you wont be doing updates on data here, even though after having redundancy we wont end up in inconsistent state. Let's take a simple Example - when you make purchase on amazon then it requires a OLTP system (a rdbms kind of database). Here the tables should be normalized. when amazon is doing data analysis of historical data, they will create denormalized tables just to make sure analysis is faster and costly joins can be avoided. 𝐇𝐨𝐰 𝐭𝐨 𝐠𝐞𝐭 𝐬𝐭𝐚𝐫𝐭𝐞𝐝 𝐰𝐢𝐭𝐡 𝐬𝐩𝐚𝐫𝐤 𝐨𝐩𝐭𝐢𝐦𝐢𝐳𝐚𝐭𝐢𝐨𝐧? We often have this confusion around the initial steps to take for starting the optimization of jobs. Let me share with you the optimal way to start, once an optimization task is given to you. Step 01: Ensure the pipeline is in a runnable state. Let the pipeline run for 10 minutes. If the pipeline doesn't succeed, then check the spark UI logs. Step02: In the jobs section, check the duration of the active and pending jobs. If the duration is > 30mins then it's a red flag indicating this job needs to be optimised at a particular stage. Step03: You can get to know the data frame which is being executed as a job using job_id and description which further drills down the piece of code which is taking more time. Step04: If you can't find the exact data frame then open the underlying stages under this job and find out which data operation in the stage is taking too much time like broadcast, groupBy, aggregate functions etc. Step05: Drill down further in your code to find where these process-heavy operations are being used and the tables that were involved. This can give you an initial understanding of where your job is getting stuck and provides you the doorway to optimize it. 1. What is RDD? 2. Difference between transformations and actions with examples 3. DataFrame vs Datasets 4. What are shared variables with examples 5. What is lazy evaluation in spark? 6. Difference between repartition and coalesce 7. Difference between reduceByKey and groupByKey 8. Well known optimization techniques that are used in spark application 9. What is serialization and deserialization? 10. Difference between deploying spark application in cluster and client mode? 11. Difference between cache and persist 12. What are dstreams 13. What are catalyst optimizer and lineage graph 14. Debugging techniques in spark 15. Basic dynamic memory allocation in spark 16. Calculation behind driver, executor memory and number of executors that needs to be assigned to reduce failure of spark application in cluster. We have always learnt that number of tasks will always be equal to number of partitions in a Spark Dataframe. Lets say we created a single node databricks cluster with 8 GB RAM & 4 cpu cores. we uploaded a file of 2.1 GB. Now when we create a dataframe, it shows 19 partitions, thats expected. But when we are running a groupBy then it shows 8 tasks in stage 1 (all tasks handling almost same amount of data) and 200 tasks in stage 2 Note: AQE is disabled. Ideally number of tasks in stage 1 should have been 19, why do we have 8 tasks? 📌 What are the two main types of transformation operations in spark? => There are two main types of transformation operations in Spark: ✅ Narrow transformations: Narrow transformations are transformations in which each input partition contributes to at most one output partition. 🔺 This means that Spark can execute these transformations without shuffling data across the network. 🔺 Examples of narrow transformations include map, filter, union, and so on. ✅ Wide transformations: Wide transformations are transformations in which each input partition can contribute to multiple output partitions. 🔺 This means that Spark needs to shuffle data across the network to ensure that all the data required for the transformation is co-located on the same worker node. 🔺 Examples of wide transformations include groupBy, reduceByKey, sortByKey, and so on. Explain #lineage in #spark ? Ans. Lineage in Spark is a fundamental concept that enables fault tolerance and #data #recovery without relying on explicit data #replication or #checkpointing. ✏ It refers to the logical representation of the transformations applied to an #rdd (Resilient Distributed Dataset) or #dataframe . ✏ When you apply transformations on an RDD or DataFrame in Spark, such as #map , #filter , or groupBy, the transformations are not immediately executed. Instead, Spark builds a lineage graph that tracks the #dependencies between the #input data and the applied transformations. ✏ The lineage graph is a directed acyclic graph (#dag) where each node represents an RDD or DataFrame, and the edges represent the dependencies between them. The #graph captures the entire history of #transformations from the initial #data #source to the final result. ✏ It provides a scalable and efficient approach to handle failures in distributed computations. Additionally, the lineage graph plays a crucial role in #optimizing the #execution #plan in Spark. 👉 Transformations are operations done on RDD due to which Spark creates a new RDD with some changes in the previous RDD. Let’s say you applied a filter on the existing data, or you did union of multiple datasets. Since spark works on Lazy execution approach, all the transformations we apply on any dataframe or RDDs, none of them get executed until we hit an action which will force spark to create a new job and return the results to driver. All the transformation will create a transformation lineage or dependency graph, i.e., the sequence of transformations. Filter(), groupBy(), Map(), sortByKey() are examples of transformations in spark. 👉 Actions are operations which require spark to do computation, create a new job to fetch the intended result of the transformation(s) applied so far. The functions collect(), show(), count() are all actions in spark. What other examples are there for transformations and actions according to you? What is Serialization Issue in Spark =============================== ✏ Serialization in Spark refers to the process of converting an object in memory to a format that can be stored or transmitted over a network. ✏ This process is important for Spark because it allows data to be distributed across a cluster of machines for processing. However, serialization can also lead to performance issues if not handled properly. ✏ An example of a serialization issue in Spark might occur when trying to perform a groupBy operation on a large dataset. ✏ In this case, the data needs to be shuffled across the nodes in the cluster in order to perform the groupBy, and the data needs to be serialized and deserialized during this process. ✏ If the data is not serialized efficiently, this can lead to a significant performance bottleneck. ✏ One way to address this issue is to use a more efficient serialization format, such as Kryo. ✏ Kryo is a more efficient serialization library for Spark that can significantly improve performance by reducing the amount of data that needs to be sent over the network. ✏ Another way to improve serialization performance is to make sure that any custom classes that you are using in your Spark code implement the Serializable interface. ✏ This will ensure that Spark can serialize and deserialize your classes more efficiently. 🔥 Here is an example of how to configure Spark to use Kryo as the serialization format: ========================================= val conf = new SparkConf().setAppName("MyApp") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) 📋 Note: ==== In this example, we are setting the serializer property to "org.apache.spark.serializer.KryoSerializer" to use Kryo as the serialization format in Spark. 𝐄𝐱𝐩𝐥𝐨𝐫𝐢𝐧𝐠 #spark 𝐃𝐞𝐝𝐮𝐩𝐥𝐢𝐜𝐚𝐭𝐢𝐨𝐧 🔭 💡 Deduplication refers to the process of removing duplicate records from a dataset, and Spark provides various features and functions that can be used to achieve deduplication. 💡 The deduplication process in Spark typically involves using transformations and operations to identify and filter out duplicate records based on one or more columns. 💡 The specific approach may depend on the nature of the data and the criteria for determining duplicates. - Here is one approach using dropDuplicates function. //Step-1: Load the input dataset val inputDF = spark.read.csv("input_path/data.csv") //Step-2a: Identify and remove duplicates based on specific columns val deduplicatedDF = inputDF.dropDuplicates("column1", "column2", ...) 💡 Specify the column names inside dropDuplicates based on which we want to identify duplicates. We can provide one or more columns. //Step-2b: Identify and remove duplicates based on all columns val deduplicatedDF = inputDF.dropDuplicates() 💡 If we want to perform deduplication based on all columns in a Spark DataFrame, we can use the dropDuplicates method without specifying any columns. 💡 This approach considers all columns in the DataFrame to identify and remove duplicate rows. 💡 We need to note that deduplication based on all columns might be an expensive operation, especially if the DataFrame has many columns or if the columns have a high cardinality. 💡 Other approaches for Deduplication can be one of the following: 🎲 Using groupBy and agg : Grouping the data and aggregating it. 🎲 Using window function wherein row_number can be used. 🎲 Using hashing 🎲 Using distinct Q)How do you handle nulls in data?(df.na.drop() or df.dropna() or use when and otherwise to assign any value:df=df.withColumn("a",when(col("b").isnull(),"M"))) Q)How to read files recursively in a folder in pyspark?(use recursivelookup option or use *.file_type at folder level) Q)When to use partitioning and when to use bucketing?(when partitioning of data creates small file problem then go for bucketing.eg: partition data on age(1-100) will generate 100 small partition files ,instead do bucketing of data in 3 buckets(1-33, 34-67,68-100)). Q)Query to generate the 2nd highest salary and to remove dups. Q) delta table and the transaction log and its time travel properties were asked.(restore delta table, print older version of delta table etc.) Q) optimisation techniques used in project Round-2 Q)Asked about the basic working of various window functions like lead,lag,dense_rank() etc. Q)Generate cumulative sum over salary column of a table per department. Q) Questions on Dimension tables vs fact tables and about star schema. Q)General idea of Implementation of scd-2.(sql code must have 3 columns like the flag, start_date and end_date to denote each record whether active or not and from when till when as per scd-2) Q)Count number of occurrences of 'p' in column Fruits as given below. Example: Fruit ------- apple-------------> 2 pineapple -------> 3 (with cte as(select len(fruit) as l1, len(replace('fruit','p','')) as l2 from t) select (l1-l2) as p_count from cte) Q)0,1,1,2,3,5,8-> write function to return fibbonacci series.(used recursion like : return fibb(n-1)+fibb(n-2)) Q)Find the customer who missed atleast two due dates using pyspark. CusId Duedate PaymentDate C1 01/01/2019 12/30/2018 C1 01/02/2019 01/25/2019 C1 01/02/2019 01/24/2019 C2 05/01/2019 06/01/2019 C2 05/02/2019 02/02/2019 C2 05/03/2019 07/03/2019 **take it as a challenge to solve and u can provide queries in comments below. Round-3 Q) SQL problem,DDL statements and solution provided in first comment of this post. Q)1 situation based question to equally distribute water between 3 people with some conditions. Q)reading a csv file without headers and writing it into parquet? Q)avro vs parquet vs orc Q)Driver vs worker nodes how will they behave when increasing memory/cores aggressively. Q)questions on Compression algo working internally for parquet and internal working and serilization and de-serialization of data. Q)How did u handles data skewness? 1.calculate total raw data size:%sh du -sch /dbfs:"source path" 2.calculate total partitions needed by dividing data size by 128 mb(140 * 1024/128) 3.to get total existing partitions in source:df.rdd.getnumpartitions() 4.to check skewness:df.withcolumn("c",spark_partition_id()).groupby("c").count() compare step 2 and 3,if total partitions in step-2>step-3 use repartition(as we need to increase the number of existing partitions)else use coalesce()). Round1: Q1)T1:employee(emp_id ,emp_name ,salary ,dep_id),T2:department(dep_id ,dep_name)a) Check for duplicates and remove them.b)select all employees having department assigned to them from emp tabel and those who don't have assigned put 'not assigned' there? Q2)We have 2 pipelines P1,P2 in 2 different azure subscriptions. How would you set dependency for P2 on P1 such that P2 triggers after P1 completion?(Answered it using tumbling window trigger( offset+interval)). Q3)T1:employee (emp_id ,emp_name ,salary ,dep_id),T2:department (dep_id ,dep_name).From emp source find the 5th highest salary in each dept.If the distinct records in dept are less than 5 then find that highest salary(eg: if no_of_records in dept=4 then find 4th highest,if 3 then find 3rd highest).IF no of records are>5 then also always find 5th highest salary)? (Was able to solve it using multiple cte + window functions) Q4)where vs having.when to use what? Q5)Design pipeline to copy files from a folder in azure sql db server having size greater than 50 mb?(answered as below step-1: create LS for external storage +dataset for point to the folder containing files. step-2: use getmetadata(to get all files)-->foreach activity(loop each file)-->getmetadata(get corresponding file size)-->if activity(to check for files> 50 mb)-->copy activity) Q6)Design pipeline to copy files out of 50 files from a folder which are a month old?(same concept as above). Round2: Q1)If you hv an on-prem server which type of IR would u prefer and why?( answered self-hosted IR) Q2) Could the on premise server also be accessible with azuredefault IR? Could the self-hosted IR access the sql server hosted on azure cloud and few more IR questions?(struggled a lot on IR questions,so prepare IR very well for your interviews.) Q3)Design a pipeline to carry out the data archival process from an adls input folder in adls to sql server?(answered 2 copy data, 1 pointing to input folder to copy to sql server, 2nd copy data for moving data from input folder to some other archival folder and connected to delete activity for deleting files in input folder) Q4)Pyspark code to get 3rd highest salary in each department. Q5)T1:movie(movie_id ,movie_name ,release_date ,genre ,rating) T2:contributors(movie_id ,Contributor_name ,contributor_role(actor/actress/dire).Find the top 5 famous action heroes in last 2 years?Write the solution in pyspark. (wanted an optimized solution in less time,had provided basic solution.) Q6)emp table:10 rows,dep table:5 rows.To get all rows from emp table which join should be used.If left join used, can we achieve same result with right join and how. If same thing can be done with both joins why do we have 2 joins instead of just left join. Under what conditions will left and right join not give same result?(so, when there are more than 2 tables then left and right join won't give same result.)u can answer in comments driver memory allocation in spark: when we submitted spark application in YARN cluster, the resourcemanager will start create an application master and driver will be started in that application master. we need to request driver memory while using spark-submit there are two config properties for driver memory 1) spark.driver.memory 2) spark.driver.memoryOverhead (can be set by the admin,mostly 0.1) suppose we are asking 8GB for driver memory, actually we are requesting 8GB+max(384 MB,10% of requested memory(8GB in our case)) max(384 MB,10% of requested memory) is used as overhead memory. spark application will use entire heap memory except overhead memory. The amount of memory needed by the driver to administer the resources and carry out the responsibilities of a Spark application is referred to as the driver overhead memory( in simple words driver overhead memory will be used to store driver metadata) This comprises managing the job, scheduling the stages, and preserving the application's state like RDD lineage etc., it is important to allocate enough driver overhead memory to get rid of out of memory errors during spark application execution.