CICD In Azure understanding

 


structure streaming vs autoloader



Structured Streaming

Structured Streaming is an extension of the Spark SQL engine that processes continuous data streams as tables. It enables real-time ingestion and processing of streaming data.

Key Features

  • Supports real-time stream processing.
  • Uses micro-batch or continuous mode.
  • Works with Kafka, Azure Event Hub, Delta Lake, S3, ADLS.
  • Fault tolerance and checkpointing via WAL (Write-Ahead Log).
  • Supports Exactly-once processing.



from pyspark.sql.functions import col

# Define input path in DBFS
input_path = "dbfs:/mnt/streaming-data/"

# Read streaming data from DBFS
df = (spark.readStream
    .format("json")
    .schema("id INT, name STRING, timestamp STRING")  # Define schema
    .load(input_path))

# Transform the data
transformed_df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Write to Delta Table in DBFS
query = (transformed_df.writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/structured_streaming")
    .start("dbfs:/mnt/delta/structured_streaming_output"))



Micro-Batch Streaming with GROUP BY Aggregation

Micro-Batch Trigger Modes

(A) Run Every Fixed Interval (Processing Time)

.trigger(Trigger.ProcessingTime("5 seconds")) # Runs every 5 seconds
  • Best for near real-time processing.
  • Controls the batch frequency to balance latency vs. resource utilization.

(B) Run Once and Stop

.trigger(Trigger.Once()) # Runs once, processes all available data, then stops
  • Useful for batch-style processing with streaming APIs.
  • Can be scheduled periodically via Databricks Jobs.

(C) Run for Available Data and Stop

.trigger(Trigger.AvailableNow()) # Processes all new data and stops
  • Similar to Trigger.Once(), but will process only new data.


from pyspark.sql.functions import col, sum
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.streaming import Trigger

# Define schema
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Define input path
input_path = "dbfs:/mnt/streaming-sales/"

# Read streaming data in micro-batches
df = (spark.readStream
    .format("json")
    .schema(schema)
    .load(input_path))

# Perform Group By aggregation (Total sales per category)
aggregated_df = (df
    .groupBy("category")
    .agg(sum("amount").alias("total_sales")))

# Write output with micro-batching (every 10 seconds)
query = (aggregated_df.writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/microbatch_groupby")
    .trigger(Trigger.ProcessingTime("10 seconds"))  # Micro-batch every 10 seconds
    .outputMode("complete")  # Required for aggregations without watermarking
    .start("dbfs:/mnt/delta/microbatch_groupby_output"))




When to Use foreachBatch() in Structured Streaming?

foreachBatch() is used when you need more control over each micro-batch, such as:
✅ Writing to multiple destinations (e.g., Delta + Kafka).
✅ Running custom transformations before writing.
✅ Writing to JDBC databases (e.g., MySQL, PostgreSQL, SQL Server).
✅ Implementing custom business logic for each batch.


from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.streaming import Trigger

# Define schema
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Define input path
input_path = "dbfs:/mnt/streaming-sales/"

# Read streaming data in micro-batches
df = (spark.readStream
    .format("json")
    .schema(schema)
    .load(input_path))

# Define custom function for `foreachBatch()`
def write_to_multiple_sinks(batch_df, batch_id):
    # Write to Delta Lake
    batch_df.write.format("delta").mode("append").save("dbfs:/mnt/delta/foreachbatch_output")

    # Write to Console (for debugging)
    batch_df.show()

# Write with foreachBatch (runs on each micro-batch)
query = (df.writeStream
    .foreachBatch(write_to_multiple_sinks)  # Apply function for each batch
    .trigger(Trigger.ProcessingTime("10 seconds"))  # Micro-batch every 10 seconds
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/foreachbatch_example")
    .start())


AUTOLOADER EXAMPLES 


from pyspark.sql.functions import col

# Define input path in DBFS
input_path = "dbfs:/mnt/autoloader-data/"

# Read new files using Auto Loader
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/schema/autoloader/")
    .load(input_path))

# Transform the data
transformed_df = df.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Write to Delta Table in DBFS
query = (transformed_df.writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/checkpoints/autoloader")
    .start("dbfs:/mnt/delta/autoloader_output"))



Summary: When to Use Each?

FeatureStructured StreamingAuto Loader
Best forContinuous event-based streamingIncremental file ingestion
Use CaseKafka, event hubs, logsJSON, CSV, Parquet files in DBFS
Schema EvolutionNeeds manual handlingAuto-managed
PerformanceGood for real-time dataOptimized for cloud storage & DBFS

🚀 Auto Loader is the best choice for ingesting files from DBFS efficiently, whereas Structured Streaming is better for real-time event-based processing.





Delta LIve Table || Interview questions and Answers

 



Delta Live Tables
It is a declarative ETL framework for the Databricks Data Intelligence Platform that helps data teams simplify streaming and batch ETL cost-effectively.

declarative framework is a programming or development approach where you specify what you want to achieve, but not necessarily how to achieve
you declare the desired outcome, and the framework or system takes care of the implementation details.

• Focus on What, Not How:
• Abstraction of Implementation
• Less Manual Control:

They provide instructions or configuration details about how a particular piece of code should be treated or processed by a framework or tool.
writing explicit code to specify certain settings, you can use annotations to convey that information more concisely.

DLT provides you a feature to create dataset in form of streaming tables, materialized views, and views maintained the results of declarative queries

• Streaming Table
• Materialized View
• View

How to define dataset 

• To define DLT Table and materialized view both we use @dlt.table however Databricks will understand based on the type of read performed in the function.
○ To define DLT materialized view we need to performs a static read against a data source.   - spark.read.format("csv")
○ To define DLT Table we need to performs a streaming read against a data source --  spark.readStream.format("cloudFiles")
§ Or we can directly declare  --- > dlt.create_streaming_table
• To define DLT View we use @dtl.view table


When to use 

• Streaming Table
○ A query is defined against a data source that is continuously or incrementally growing.
○ Query results should be computed incrementally.
○ High throughput and low latency is desired for the pipeline.

• Materialized View
○ You want to view the results of a query during development. Because tables are materialized and can be viewed and queried outside of the pipeline, using tables during development can help validate the correctness of computations.
○ Materialized views should be used for data sources with  deletions, or aggregations, and for change data capture processing (CDC).
• View
○ Delta Live Tables does not publish views to the catalog, so views can be referenced only within the pipeline in which they are defined
○ If want to reduce storage and compute costs and do not require the materialization of query results
○ If we have large or complex query that you want to break into easier-to-manage queries.
○ If we want to validate intermediate results using expectations

DLT Expectations:

○ Delta Live Tables (DLT) offers out-of-box features for handling expectations and data quality monitoring to deliver high-quality data on delta lake.
○ We can define one or more validation rules and these rules can be applied in DLT datasets in declarative statement
○ An expectation consists of three things:
§ A description, which acts as a unique identifier and allows you to track metrics for the constraint.
§ A boolean statement that always returns true or false based on some stated condition.
§ An action to take when a record fails the expectation, meaning the boolean returns false.
○ There are three ways in which we can declare these validation 
§ Warning   -->   @dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
§ Drop --> @dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
§ Fail --> @dlt.expect_or_fail("valid_count", "count > 0")


CDC in Delta Live table:
• We generally use merge statements to perform SCD1 and SCD2 and we need to write some complex logic to handle different conditions.
• However with Apply changes function we just need to pass few of the details in a given syntax provided by Databricks to perform SCD1 and SCD2.

• ignore_null_valus
○ Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null will retain their existing values in the target. 
• Apply as delete
○ Specifies when a CDC event should be treated as a DELETE rather than an upsert.
• Apply as turncate
○ Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.
• sequence by 
○ it will help us if same keys with same time appear it will treat based on Timestamp desc & Version & set the start_at & end_at accordingly. 

• Track_history_coulmn_list 
○ It will help to maintain any historical changes in row. Keys it will decides based on which column need to maintain the history

Append Flow:
• When ever you want to ingest data from multiple tables
• For example, you might have a table that combines regional data from every region you’re operating in. As new regions are rolled out, you can add the new region data to the table without performing a full refresh.



Skill Needed for data engineers

 Primary Responsibilities & Qualification:

  • Lead Data Engineering activities by working closely with various teams/ members
  • Intensive software development experience under Agile development life cycle processes and tools
  • Strong understanding of data engineering concepts and best practices.
  • Proficiency in SQL and experience with data modeling techniques.
  • Familiarity with AWS services, particularly Redshift, S3, and Glue.
  • Knowledge of ETL (Extract, Transform, Load) processes and tools.
  • Excellent problem-solving and troubleshooting skills.
  • Strong communication skills to collaborate with cross-functional teams.

Data Warehouse Modeling:

  • Design and implement data models for the data warehouse.
  • Create and maintain data schemas, tables, and relationships.
  • Optimize data models for query performance and storage efficiency.
  • Ensure data integrity and enforce data quality standards.

Data Ingestion:

  • Develop and maintain data ingestion pipelines.
  • Extract data from various sources (databases, APIs, logs, etc.).
  • Transform and clean data as needed before loading it into Redshift.
  • Schedule and automate data ingestion processes.
  • Monitor and optimize data ingestion performance.

AWS Redshift:

  • Set up and configure Redshift clusters based on workload requirements.
  • Tune and optimize query performance through indexing and distribution strategies.
  • Monitor and manage Redshift performance, including workload management and query optimization.
  • Implement security measures and access controls for Redshift.
  • Ensure high availability and disaster recovery for Redshift clusters.

ETL (Extract, Transform, Load):

  • Develop ETL workflows using AWS Glue, Apache Spark, or other relevant tools.
  • Transform and enrich data during the ETL process to meet business requirements.
  • Handle schema evolution and data versioning in ETL pipelines.
  • Monitor ETL job performance and troubleshoot issues.
  • Implement data lineage and metadata management.

Data Governance and Compliance:

  • Implement data governance practices, including data lineage, data cataloging, and data documentation.
  • Ensure compliance with data privacy and security regulations (e.g., GDPR,).
  • Implement data retention policies and archiving strategies.

Automation and Monitoring:

  • Implement automation scripts and tools for managing data pipelines and workflows.
  • Set up monitoring and alerting for data pipeline failures and performance issues.
  • Conduct regular health checks and capacity planning for the data warehouse.

Documentation and Collaboration:

  • Maintain clear and up-to-date documentation for data processes, pipelines, and data models.
  • Collaborate with data analysts, data scientists, and business stakeholders to understand data requirements and deliver actionable insights.

Performance Tuning and Optimization:

  • Continuously optimize data warehouse performance through query tuning and resource management.
  • Implement Redshift best practices for workload management.
  • Identify and resolve bottlenecks in data pipelines and ETL processes.

Scalability and Cost Management:

  • Ensure the data warehouse infrastructure scales effectively to handle growing data volumes.
  • Monitor and manage costs associated with Redshift and other AWS services.
  • Implement cost-saving strategies without compromising performance.
  • Good knowledge on cyber security: penetration tests, DDOS attack prevention, TLS, PKI etc.
  • Application lifecycle management, DevOps, CI and CD
  • Experience in designing big data applications
  • This individual should be self-driven, highly motivated, and organized with strong analytical thinking and problem solving skills, and an ability to work on multiple projects and function in a team environment.