Run integration test between Data Factory and Databricks in Azure DevOps CI pipeline

Ilse Epskamp
4 min readMay 2, 2024
Photo by Tim Mossholder on Unsplash

A common setup for Azure data workflows is using Data Factory as orchestrator for running Databricks notebooks. For example, using Blob event triggers pipelines are executed on receival of data in Blob/ADLS, after which the pipeline passes relevant information to Databricks*. Next, in Databricks some logic is being executed. To ensure a successful workflow, notebooks that you are referring to in your Data Factory pipelines should obviously exist in Databricks. Now you could say that is a no-brainer, however when time passes after the initial development, and when your application is growing, you might loose sight on the connections between your pipelines and notebooks. Imagine cleaning up rendundant notebooks, not realizing some pipelines were still referencing it…your pipeline will fail, which could cause a chain of unwanted events. In this blog I demonstrate an approach to test the integration of Data Factory and Databricks as part of your CI pipeline, so as a developer you can catch this potential issue before deployment.

*Want to learn how to pass parameters between Data Factory and Databricks? Check out this blog!

Run Powershell script with integration test during CI

I am using Powershell to create the integration test. This script is running during the CI. The test takes input from the Azure DevOps repository, where pipeline definitions and notebook files are stored in separate directories:

$work_dir=$env:SYSTEM_DEFAULTWORKINGDIRECTORY
$dir_adb=$work_dir + '\src\databricks\notebooks'
$dir_adf_pipelines=$work_dir + '\src\datafactory\pipelines'

Now let’s build up the script. First we want to collect all the pipelines and notebooks from the repository in an array.

$arr_notebooks=@()
$arr_pipelines=@()

#list all adb notebooks, adf pipeline sourcefiles and adf datasets
$arr_notebooks=@(Get-ChildItem -Path $dir_adb)
$arr_pipelines=@(Get-ChildItem -Path $dir_adf_pipelines)

In the notebooks array ($arr_notebooks) the names of the notebooks contain the file extension, for example “.py”. However, when a data factory pipeline refers to a notebook, it refers to the notebook filename without the extension. So next, we remove the extension from the notebook names and save in a new array:

$arr_notebooks_rm_ext=[System.Collections.ArrayList]@()

#remove extensions from notebook filenames
foreach($notebook in $arr_notebooks)
{
$notebookName=$notebook.ToString().Split(".")[0]
$arr_notebooks_rm_ext.Add($notebookName) > $null
}

Now we iterate all the pipeline definitions, to find where Databricks notebook activities are referenced. In this example, I check for activity type “DatabricksNotebook” which can be referenced from the root of the pipeline, or within a Switch activity. When found, I assess if the referenced notebook name** exists in the notebooks array we extracted from our repo. In case the notebook is not found, we add an error to our errorlog variable. Additionally, we print which pipeline is causing the issue, including the name of the notebook that was not found.

**You should extract the notebook name from the notebook path. The way to achieve this depends on the format of your path.

$errorlog=[System.Collections.ArrayList]@()

#read pipeline json files and perform checks
foreach($pipeline in $arr_pipelines)
{
$dir_pipeline=$dir_adf_pipelines+'\'+$pipeline
$pipelinejson=Get-Content -Raw -Path $dir_pipeline | ConvertFrom-Json

foreach($activity in $pipelinejson.properties.activities)
{

#get notebookPath from pipeline activity
if($activity.type -eq "DatabricksNotebook")
{
$notebookPath=$activity.typeProperties.notebookPath
}
elseif($activity.type -eq "Switch")
{
foreach($case in $activity.typeProperties.cases)
{
foreach($case_activity in $case.activities)
{
if($case_activity.type -eq "DatabricksNotebook")
{
$notebookPath=$case_activity.typeProperties.notebookPath
}
}
}
}

$notebookPath_split=$notebookPath.Split("/")
$notebookFolder=$notebookPath_split[1]
$notebookName=$notebookPath_split[2]


#check if notebook triggered in adf pipeline exists in src/databricks
if(-not ($arr_notebooks_rm_ext.Contains($notebookName)))
{
$error_msg="An activity in pipeline "+$pipeline+ " refers to a non existing notebook: "+ $notebookName
$errorlog.Add($error_msg) > $null
Write-Host $error_msg
}
}
}

We create a variable “testresult” and assign it value 0. Next, we assess if the errorlog variable contains any errors. If not, there were no issues found and the test passed. If we did find any errors, we assign a 1 to our “testresult” variable.

$testresult=0

#update output variable $testresult in case of findings
if($errorlog.count -gt 0)
{
$testresult=1
Write-Host $errorlog.count "error(s) found. Details are provided above."
}

Lastly, we return the “testresult” variable to our CI pipeline, which is either 0 or 1.

#return $testresult to ci pipeline
"##vso[task.setvariable variable=integrationTestResult;isOutput=true]$testresult"

The CI pipeline fails in case the testresult of the Integration test equals 1:

- stage: 'IntegrationTest_Databricks_DataFactory'
pool:
vmImage: 'windows-2019'
jobs:
- job: 'Run_integration_test'
steps:
- task: PowerShell@2
name: 'runTest'
inputs:
targetType: filePath
filePath: './src/powershell/ci_test_adb_adf.ps1'

- job: 'Assess_test_result'
variables:
testresult: $[dependencies.Run_integration_test.outputs['runTest.integrationTestResult']]
dependsOn: 'Run_integration_test'
steps:
- powershell: |
if($env:testresult -eq 1)
{
Write-Host "Integration test failed. Please refer to job runTest for more details."
exit 1
}
displayName: 'Assess test result'

That’s it! Additionally you can add more checks, for example:

  • assess if certain naming conventions are applied on the names of pipeline activities, or on variable names
  • assess if certain activities are used
  • include checks on datasets

Please refer to the full script below:

# ----------------------------------------------------------------------------------------------------------
# Description: Integration test between Data Factory and Databricks code in CI
# # -----------------------------------------------------------------------------------------------------------

try{

#define directories and output variables
$work_dir=$env:SYSTEM_DEFAULTWORKINGDIRECTORY
$dir_adb=$work_dir + '\src\databricks\notebooks'
$dir_adf_pipelines=$work_dir + '\src\datafactory\pipelines'
$dir_adf_datasets=$work_dir + '\src\datafactory\datasets'
$errorlog=[System.Collections.ArrayList]@()
$arr_notebooks=@()
$arr_pipelines=@()
$arr_notebooks_rm_ext=[System.Collections.ArrayList]@()
$testresult=0

#list all adb notebooks, adf pipeline sourcefiles and adf datasets
$arr_notebooks=@(Get-ChildItem -Path $dir_adb)
$arr_pipelines=@(Get-ChildItem -Path $dir_adf_pipelines)
$arr_datasets=@(Get-ChildItem -Path $dir_adf_datasets)

#remove extensions from notebook filenames
foreach($notebook in $arr_notebooks)
{
$notebookName=$notebook.ToString().Split(".")[0]
$arr_notebooks_rm_ext.Add($notebookName) > $null
}

#read pipeline json files and perform checks
foreach($pipeline in $arr_pipelines)
{
$dir_pipeline=$dir_adf_pipelines+'\'+$pipeline
$pipelinejson=Get-Content -Raw -Path $dir_pipeline | ConvertFrom-Json

foreach($activity in $pipelinejson.properties.activities)
{

#get notebookPath from pipeline activity
if($activity.type -eq "DatabricksNotebook")
{
$notebookPath=$activity.typeProperties.notebookPath
}
elseif($activity.type -eq "Switch")
{
foreach($case in $activity.typeProperties.cases)
{
foreach($case_activity in $case.activities)
{
if($case_activity.type -eq "DatabricksNotebook")
{
$notebookPath=$case_activity.typeProperties.notebookPath
}
}
}
}

$notebookPath_split=$notebookPath.Split("/")
$notebookFolder=$notebookPath_split[1]
$notebookName=$notebookPath_split[2]


#check if notebook triggered in adf pipeline exists in src/databricks
if(-not ($arr_notebooks_rm_ext.Contains($notebookName)))
{
$error_msg="An activity in pipeline "+$pipeline+ " refers to a non existing notebook: "+ $notebookName
$errorlog.Add($error_msg) > $null
Write-Host $error_msg
}
}
}

#update output variable $testresult in case of findings
if($errorlog.count -gt 0)
{
$testresult=1
Write-Host $errorlog.count "error(s) found. Details are provided above."
}

#return $testresult to ci pipeline
"##vso[task.setvariable variable=integrationTestResult;isOutput=true]$testresult"
}

catch{
Write-Host "Error: Script not executed"
Write-Host $_.Exception
}

Thanks for reading!

--

--

Ilse Epskamp

Azure Certified IT Engineer with 9+ years of experience in the banking industry. Focus areas: Azure, Data Engineering, DevOps, CI/CD, Automation, Python