Azure Data Factory – Wrangling Data Flow (WDF)

Wrangling data flow integrates Power Query’s mashup experience within Azure Data Factory V2. Wrangling data flow enables user to do the transformation in a very familiar user interface (and in a very familiar ‘M’ language) but then runs those transformation at scale, via spark execution. Refer to WDF public documentation to learn more about how it is different from Mapping data flow and power query dataflow.

In this blog, I will explore the wrangling experience in Azure Data Factory. Power query provides rich set of transformations, but I would not expect all those transformations to be supported in WDF, given it is in public preview now.
Here is the list of all the power query transformation that are currently supported in WDF.
For the purpose of demonstration, I have taken employee data prep scenario, where I have two CSV data source in Azure blob. One of the sources contains employee info and other employee salary details.

Similar to Mapping data flow, Wrangling data flow can be created by clicking on “Data flow” menu option.

Clicking on “Data flow” now presents the option for “Wrangling Data Flow”.

Configuring Wrangling data flow

Clicking on “OK” opens the Wrangling data flow window which is essentially a power query’s mashup editor.
Things to notice over here: All the source dataset becomes queries inside “ADFResource” folder. There is a “UserQuery” which points to one of the source datasets. “UserQuery” is where all the transformations will be done. Dataset queries should not be changed, since any change there will not be persisted.
Things to notice over here:

  • Add/Delete/Rename of queries are not allowed.
  • All transformation should go in UserQuery and not dataset queries.
  • PQ has very rich set of transformation. WDF only supports a subset of them.
  • Whenever an usupported transformation is used, WDF throws validation error ” The wrangling data flow is invalid. Expression.Error: The transformation logic is not supported. Please try a simpler expression.”

Lets do some transformations now.
We will join “UserQuery” (which is pointing to “EmployeeInfoDataset”) to “EmployeeSalaryDataset. “Combine tables” option can be used to combine tables.

On doing the join we immediately see an error error. The error should go once we expand the joined column.

Now we replace the quotes from all the columns. This time instead of using UX we will directly update the query. Here is the query I used to replace quotes:

Table.ReplaceValue(RemoveColumns, """", "", Replacer.ReplaceText, {"FirstName", "LastName", "City", "ZIP", "Email", "State"}) 

Next step is to remove all the invalid emails. I added a conditional column. This is not the perfect way to validate emails and I am sure that there are some better way to do it.

Next step is to combine two columns. Although there is a straight forward way to do this in Mashup/PQ (Table.CombineColumns), but that does not work in ADF because Table.CombineColumns is not supported in ADF. That being said, there is another way to achieve it.

Table.AddColumn(RemoveEmailColumn, "Name", each [FirstName] & " " & [LastName])

Converting the column type (for DateOfJoining) from text to Date.

Adding a conditional column, to categorize employee into senior and junior based on date of joining.

Convert “Base pay” to number

Now lets get the average salary for each city.

All the above step can be achieved by just typing the mashup in “Advanced editor” of UserQuery.
Right click on UserQuery->Advanced editor.

Here is the mashup of the transformations that I used in this blog.

  Source = EmployeeInfoDataset,
  JoinTransformation = Table.NestedJoin(Source, {"EmployeeId"}, EmployeeSalaryDataset, {"EmployeeId"}, "EmployeeSalaryDataset", JoinKind.Inner),
  ExpandColumns = Table.ExpandTableColumn(JoinTransformation, "EmployeeSalaryDataset", {"BasePay"}, {"EmployeeSalaryDataset.BasePay"}),
  RemoveColumns = Table.RemoveColumns(ExpandColumns, {"EmployeeId"}),
  ReplacedQuotes = Table.ReplaceValue(RemoveColumns, """", "", Replacer.ReplaceText, {"FirstName", "LastName", "City", "ZIP", "Email", "State"}),
  ConvertToNumber = Table.TransformColumnTypes(ReplacedQuotes, {{"ZIP", type number}}),
  AddConditionColumnReplaceNullWithZero = Table.AddColumn(ConvertToNumber, "SanitizedZip", each if [ZIP] = null then 0 else [ZIP]),
  #"Removed columns" = Table.RemoveColumns(AddConditionColumnReplaceNullWithZero, {"ZIP"}),
  #"Inserted conditional column" = Table.AddColumn(#"Removed columns", "SanitizedEmail", each if not Text.Contains([Email], "@") then "" else if not Text.Contains([Email], ".") then "" else [Email]),
  RemoveEmailColumn = Table.RemoveColumns(#"Inserted conditional column", {"Email"}),
  MergeColumn = Table.AddColumn(RemoveEmailColumn, "Name", each [FirstName] & " " & [LastName]),
  RemoveFirstLastName = Table.RemoveColumns(MergeColumn, {"FirstName", "LastName"}),
  ConvertToDate = Table.TransformColumnTypes(RemoveFirstLastName, {{"DateOfJoining", type date}}),
  ConditionColumnEmpType = Table.AddColumn(ConvertToDate, "EmployeeType", each if [DateOfJoining] <= #date(2008, 2, 5) then "Senior" else "Junior"),
  #"Reordered columns" = Table.ReorderColumns(ConditionColumnEmpType, {"Name", "SanitizedEmail", "DateOfJoining", "EmployeeType", "EmployeeSalaryDataset.BasePay", "SanitizedZip", "City", "State"}),
  ConvertIdToNumber = Table.TransformColumnTypes(#"Reordered columns", {{"EmployeeSalaryDataset.BasePay", type number}}),
  RemoveRowsWithEmptyEmail = Table.SelectRows(ConvertIdToNumber, each [SanitizedEmail] <> null and [SanitizedEmail] <> ""),
  AverageSalaryByCity = Table.Group(RemoveRowsWithEmptyEmail, {"City"}, {{"AverageSalary", each List.Average([EmployeeSalaryDataset.BasePay]), type number}}),
  #"Filtered rows" = Table.SelectRows(AverageSalaryByCity, each [AverageSalary] <= 200000)
  #"Filtered rows"

Once Wrangling data flow is created, it can be used in ADF pipeline when it needs to run on spark.

Leave a Reply

Your email address will not be published. Required fields are marked *