Custom queue mechanism for Data Factory pipelines

Ilse Epskamp
Azure Tutorials
Published in
4 min readMar 7, 2022

--

Photo by Hal Gatewood on Unsplash

As a data engineer you might come across the requirement to queue your Data Factory pipelines, to ensure no other instance of the same pipeline is already running. For example, as part of your pipeline you are running a configuration activity on the database or storage account dedicated to the current job, where running the same pipeline in parallel will cause the configurations to conflict. Currently there is no out-of-the-box feature in Data Factory to queue the runs of the same pipeline. A proposed solution on the web is using the Web Activity in a so called proxy pipeline to query the running pipeline instances. Only if no active instances are running, the current job is executed. In this blog we demonstrate a possible implementation of this approach.

Scenario
When triggered multiple times, pipeline A is only allowed to run sequentially. A proxy pipeline checks if pipeline A is available. If it is available the pipeline is triggered, else we wait for 1 minute and try again.

Step by Step guide

  1. Create a target pipeline
    Create a pipeline that is target for the proxy pipeline. This is the pipeline for which only one instance per time is allowed to run. We call it target_pipeline. For the purpose of this blog our pipeline has one activity: Wait 2 minutes.
  2. Create a proxy pipeline
    We call this pipeline_check_availability. It has one pipeline variable and two activities with several subactivities.
Proxy pipeline to check the availability of another pipeline before triggering it.

Pipeline Variables
activeRuns; This shows the number of active runs of the target pipeline. Default value= 1.

Pipeline Variable “activeRuns”.

Activities
Until; Until the target pipeline is available, we perform activities to check its availability.

Until activity

In Tab Settings, add an expression which defines the behaviour of this activity:

@equals(int(variables('activeRuns')),0)

In other words, until the value of variable activeRuns equals 0, we continuously run the activities within the Until block. Since the default value of this variable is 1, the first thing the pipeline will do is check the target pipeline’s availability.

We add four subactivities to the Until activity:

Activities within the Until activity to check the target pipeline’s availability.
  1. Web Activity to get the active runs: perform a call to the Azure Management API to read the current pipeline’s activity. Settings:
{
"lastUpdatedAfter": "@{adddays(utcnow(),-2)}",
"lastUpdatedBefore": "@{utcnow()}",
"filters": [
{
"operand": "PipelineName",
"operator": "Equals",
"values": ["target_pipeline"]
}]
}

2. Filter activity to filter the output of the web activity:

In Tab Settings set the Items and Condition values:

Items; use the output of the Web Activity:

@activity('Get Active Runs').output.value

Condition; filter on status “InProgress” or “Queued”:

@or(equals(item().status,'InProgress'),equals(item().status,'Queued'))

3. Set Variable, to set the value of variable activeRuns

We will use the output of the Filter activity to count the number of active runs and assign this to the variable activeRuns. As explained in the beginning, this variable is the basis for the Until activity to determine if the target pipeline is available or not.

@string(activity('Filter Running Pipelines').output.FilteredItemsCount)

4. If Condition, to determine if the target pipeline is available. If the value of activeRuns is greater than 0, this means the target pipeline is not available. In that case we run a Wait activity set to 1 minute. After 1 minute, the Until activity will be triggered again, because the value of activeRuns is still not 0. If the value of activeRuns is 0, we don’t trigger any activities, because the Until loop will break.

@greater(int(variables('activeRuns')),0)

Execute Pipeline; after the Until activity, we add the activity to trigger the target pipeline. So, once the Until activity breaks because variable activeRuns equals 0, the target pipeline is triggered.

Proxy pipeline to check the availability of another pipeline before triggering it.

Next steps

  • Determine the best wait time for your proxy pipeline. In this blog we wait 1 minute before retrying to run the target pipeline, but in a productionalized environment it is likely this is not sufficient.
  • When working with multiple environments, parameterize environment dedicated variables.
  • You can check for more running pipelines by including additional pipeline names in the body of the Web Activity.

Azure Tutorials frequently publishes tutorials, best practices, insights or updates about Azure Services, to contribute to the Azure Community. Azure Tutorials is driven by two enthusiastic Azure Cloud Engineers, combining over 15 years of IT experience in several domains. Stay tuned for weekly blog updates and follow us if you are interested!
https://www.linkedin.com/company/azure-tutorials

--

--

Ilse Epskamp
Azure Tutorials

Azure Certified IT Engineer with 8+ years of experience in the banking industry. Focus areas: Azure, Data Engineering, DevOps, CI/CD, Automation, Python.