In this tutorial, we’ll see how to use LUSID’s pre-built data quality check (DQ check) worker to build a workflow that automates a DQ check.
Let’s imagine we want to automate running the check and handling the results of the simple DQ check we built in this tutorial. To do this, we first create a check task definition and an exception task definition for handling the results. We can then automate runs of the workflow via the LUSID web app and easily inspect the results.
Note
To complete this tutorial, you must have suitable access control permissions. This can most easily be achieved by assigning your LUSID user the
lusid-administratorrole.(Coming soon) You can follow this tutorial in Python using the data quality checks tutorial Jupyter notebook. We recommend running the notebook in your LUSID JupyterHub instance, which handles authentication to LUSID automatically.
Step 1: Create exception task definition
We must first create a task definition for exceptions - we’ll need to reference this exception task definition when we create the main DQ check task definition. To create the task definition, we must call the CreateTaskDefinition API, passing in the following:
A
scopeandcodethat together uniquely identify the task definitionA friendly
displayNameanddescriptionA set of possible states; in our example we need
Pending,InProgress, andResolvedA
fieldSchemadefining a set of fields; in our example we need to pass in all of the fields from a check result such as theseverityandruleFormulaAn
initialStateofPendingA set of triggers to prompt state transitions in the workflow
The actions that can occur within a state transition; in our example, we need to trigger the parent task once the exception is resolved
A set of state transitions to control movement from the initial state to all the other states; read more
curl -X POST 'https://<your-domain>.lusid.com/workflow/api/taskdefinitions'
-H 'Authorization: Bearer <your-api-access-token>'
-H 'Content-Type: application/json-patch+json'
-d '{
"id": {
"scope": "Finbourne-Examples",
"code": "DQ-Check-exception"
},
"displayName": "DQ Exception",
"description": "An exception returned by a data quality check.",
"states": [
{
"name": "Pending",
"displayName": "Pending",
"description": "Pending"
},
{
"name": "InProgress",
"displayName": "In progress",
"description": "In progress"
},
{
"name": "Resolved",
"displayName": "Resolved",
"description": "Resolved"
}
],
"fieldSchema": [
{
"name": "checkDefinitionScope",
"type": "String",
"displayName": "CheckDef Scope"
},
{
"name": "checkDefinitionCode",
"type": "String",
"displayName": "CheckDef Code"
},
{
"name": "checkDefinitionDisplayName",
"type": "String",
"displayName": "CheckDef Name"
},
{
"name": "checkRunAsAt",
"type": "DateTime",
"displayName": "Run AsAt"
},
{
"name": "resultType",
"type": "String",
"displayName": "Result Type"
},
{
"name": "rulesetKey",
"type": "String",
"displayName": "Ruleset Key"
},
{
"name": "rulesetDisplayName",
"type": "String",
"displayName": "Ruleset Name"
},
{
"name": "ruleKey",
"type": "String",
"displayName": "Rule Key"
},
{
"name": "ruleDisplayName",
"type": "String",
"displayName": "Rule Name"
},
{
"name": "ruleDescription",
"type": "String",
"displayName": "Rule Description"
},
{
"name": "ruleFormula",
"type": "String",
"displayName": "Rule Formula"
},
{
"name": "severity",
"type": "Decimal",
"displayName": "Severity"
},
{
"name": "resultId",
"type": "String",
"displayName": "Result Tracking ID"
},
{
"name": "entityType",
"type": "String",
"displayName": "Entity Type"
},
{
"name": "instrumentAsAt",
"type": "DateTime",
"displayName": "Instrument As At"
},
{
"name": "instrumentEffectiveAt",
"type": "DateTime",
"displayName": "Instrument Effective At"
},
{
"name": "instrumentScope",
"type": "String",
"displayName": "Instrument Scope"
},
{
"name": "identifierType",
"type": "String",
"displayName": "Identifier Type"
},
{
"name": "identifierValue",
"type": "String",
"displayName": "Identifier Value"
},
{
"name": "instrumentName",
"type": "String",
"displayName": "Instrument Name"
},
{
"name": "entityUniqueId",
"type": "String",
"displayName": "Entity Unique ID"
},
{
"name": "countRuleBreaches",
"type": "Decimal",
"displayName": "Number of Breaches"
},
{
"name": "errorDetail",
"type": "String",
"displayName": "Error Message"
}
],
"initialState": {
"name": "Pending",
"requiredFields": []
},
"triggers": [
{
"name": "start",
"trigger": {
"type": "External"
}
},
{
"name": "resolve",
"trigger": {
"type": "External"
}
}
],
"actions": [
{
"name": "resolve-parent",
"actionDetails": {
"type": "TriggerParentTask",
"trigger": "resolve"
}
}
],
"transitions": [
{
"fromState": "Pending",
"toState": "InProgress",
"trigger": "start"
},
{
"fromState": "InProgress",
"toState": "Resolved",
"trigger": "resolve",
"action": "resolve-parent"
}
]
}'# Connect to the Workflow API
import lusid_workflow as lw
import lusid_workflow.api as lwa
import lusid_workflow.models as lwm
import lusid_workflow.extensions as lwe
workflow_api_factory = lwe.SyncApiClientFactory(config_loaders=[
lwe.ArgsConfigurationLoader(
api_url="https://<your-domain>.lusid.com/workflow",
access_token=RefreshingToken(),
app_name="LusidJupyterNotebook"
)
])
# Create the exception task definition
task_definitions_api = workflow_api_factory.build(lwa.TaskDefinitionsApi)
tasks_api = workflow_api_factory.build(lwa.TasksApi)
task_definitions_api.create_task_definition(
create_task_definition_request=lwm.CreateTaskDefinitionRequest(
id=lwm.ResourceId(scope="Finbourne-Examples", code="DQ-Check-exception"),
display_name="DQ Exception",
description="An exception returned by a data quality check.",
states=[lwm.TaskStateDefinition(name=n, display_name=n, description=n) for n in ["Pending", "InProgress", "Resolved"]],
field_schema=[
lwm.TaskFieldDefinition(name=n, type=t, display_name=d) for n, t, d in [
("checkDefinitionScope", "String", "CheckDef Scope"),
("checkDefinitionCode", "String", "CheckDef Code"),
("checkDefinitionDisplayName", "String", "CheckDef Name"),
("checkRunAsAt", "DateTime", "Run AsAt"),
("resultType", "String", "Result Type"),
("rulesetKey", "String", "Ruleset Key"),
("rulesetDisplayName", "String", "Ruleset Name"),
("ruleKey", "String", "Rule Key"),
("ruleDisplayName", "String", "Rule Name"),
("ruleDescription", "String", "Rule Description"),
("ruleFormula", "String", "Rule Formula"),
("severity", "Decimal", "Severity"),
("resultId", "String", "Result Tracking ID"),
("entityType", "String", "Entity Type"),
("instrumentAsAt", "DateTime", "Instrument As At"),
("instrumentEffectiveAt", "DateTime", "Instrument Effective At"),
("instrumentScope", "String", "Instrument Scope"),
("identifierType", "String", "Identifier Type"),
("identifierValue", "String", "Identifier Value"),
("instrumentName", "String", "Instrument Name"),
("entityUniqueId", "String", "Entity Unique ID"),
("countRuleBreaches", "Decimal", "Number of Breaches"),
("errorDetail", "String", "Error Message"),
]
],
initial_state=lwm.InitialState(name="Pending", required_fields=[]),
triggers=[lwm.TransitionTriggerDefinition(name=n, trigger=lwm.TriggerSchema(type="External")) for n in ["start", "resolve"]],
actions=[
lwm.ActionDefinition(
name="resolve-parent",
action_details=lwm.ActionDetails(
lwm.TriggerParentTaskAction(type="TriggerParentTask", trigger="resolve")
)
)
],
transitions=[
lwm.TaskTransitionDefinition(from_state="Pending", to_state="InProgress", trigger="start"),
lwm.TaskTransitionDefinition(from_state="InProgress", to_state="Resolved", trigger="resolve", action="resolve-parent"),
]
)
)Once created, we can inspect a visualised diagram of the task definition via Workflow Service > Tasks in the LUSID web app:
Step 2: Create DQ check task definition
To create the parent task definition, we need to call the CreateTaskDefinition API, passing in the following:
Most importantly, the actions that can occur within a state transition; in this task definition, we need to trigger LUSID’s built-in worker (
scope:default/code:LusidEntityDataQuality) that can run the data quality check we specify and create an exception task for each resultA
scopeandcodethat together uniquely identify the task definitionA friendly
displayNameanddescriptionA set of possible states
A
fieldSchemaall of the fields we want to pass in when kicking off a new run of our check definition, such as thecheckDefinitionScope/CodeandinstrumentScopeAn
initialStateofPendingA set of triggers to prompt state transitions in the workflow
A set of state transitions to control movement from the initial state to all the other states; read more
curl -X POST 'https://<your-domain>.lusid.com/workflow/api/taskdefinitions'
-H 'Authorization: Bearer <your-api-access-token>'
-H 'Content-Type: application/json-patch+json'
-d '
{
"id": {
"scope": "Finbourne-Examples",
"code": "DQ-Check-CheckInstruments"
},
"displayName": "DQ Check Instruments",
"description": "Runs data quality checks for instruments.",
"states": [
{
"name": "Pending",
"displayName": "Pending",
"description": "Pending"
},
{
"name": "InProgress",
"displayName": "In progress",
"description": "In progress"
},
{
"name": "ExceptionManagement",
"displayName": "Exception Management",
"description": "Exception Management"
},
{
"name": "Complete",
"displayName": "Complete",
"description": "Complete"
},
{
"name": "Error",
"displayName": "Error",
"description": "Error"
}
],
"fieldSchema": [
{
"name": "checkDefinitionScope",
"type": "String",
"displayName": "CD Scope"
},
{
"name": "checkDefinitionCode",
"type": "String",
"displayName": "CD Code"
},
{
"name": "instrumentScope",
"type": "String",
"displayName": "Instrument Scope"
},
{
"name": "selectorAttribute",
"type": "String",
"displayName": "Selector Attribute"
},
{
"name": "selectorValue",
"type": "String",
"displayName": "Selector Value"
},
{
"name": "preferredIdentifier",
"type": "String",
"displayName": "Preferred Identifier"
},
{
"name": "ruleBreachLimit",
"type": "Decimal",
"displayName": "Rule Breach Limit"
}
],
"initialState": {
"name": "Pending",
"requiredFields": []
},
"triggers": [
{
"name": "start",
"trigger": {
"type": "External"
}
},
{
"name": "exceptions",
"trigger": {
"type": "External"
}
},
{
"name": "no_exceptions",
"trigger": {
"type": "External"
}
},
{
"name": "resolve",
"trigger": {
"type": "External"
}
},
{
"name": "error",
"trigger": {
"type": "External"
}
}
],
"actions": [
{
"name": "run-checks",
"actionDetails": {
"type": "RunWorker",
"workerId": {
"scope": "default",
"code": "LusidEntityDataQuality"
},
"workerParameters": {
"checkDefinitionScope": {
"mapFrom": "checkDefinitionScope",
"setTo": null
},
"checkDefinitionCode": {
"mapFrom": "checkDefinitionCode",
"setTo": null
},
"dataSetScope": {
"mapFrom": "instrumentScope",
"setTo": null
},
"selectorAttribute": {
"mapFrom": "selectorAttribute",
"setTo": null
},
"selectorValue": {
"mapFrom": "selectorValue",
"setTo": null
},
"returnIdentifierKey": {
"mapFrom": "preferredIdentifier",
"setTo": null
},
"limitIndividualBreachesPerRule": {
"mapFrom": "ruleBreachLimit",
"setTo": null
}
},
"workerStatusTriggers": {
"started": null,
"completedWithResults": "exceptions",
"completedNoResults": "no_exceptions",
"failedToStart": "error",
"failedToComplete": "error"
},
"childTaskConfigurations": [
{
"taskDefinitionId": {
"scope": "Finbourne-Examples",
"code": "DQ-Check-exception"
},
"mapStackingKeyFrom": "resultId",
"childTaskFields": {
"checkDefinitionScope": {
"mapFrom": "checkDefinitionScope",
"setTo": null
},
"checkDefinitionCode": {
"mapFrom": "checkDefinitionCode",
"setTo": null
},
"checkDefinitionDisplayName": {
"mapFrom": "checkDefinitionDisplayName",
"setTo": null
},
"checkRunAsAt": {
"mapFrom": "checkRunAsAt",
"setTo": null
},
"resultType": {
"mapFrom": "resultType",
"setTo": null
},
"rulesetKey": {
"mapFrom": "rulesetKey",
"setTo": null
},
"rulesetDisplayName": {
"mapFrom": "rulesetDisplayName",
"setTo": null
},
"ruleKey": {
"mapFrom": "ruleKey",
"setTo": null
},
"ruleDisplayName": {
"mapFrom": "ruleDisplayName",
"setTo": null
},
"ruleDescription": {
"mapFrom": "ruleDescription",
"setTo": null
},
"ruleFormula": {
"mapFrom": "ruleFormula",
"setTo": null
},
"resultId": {
"mapFrom": "resultId",
"setTo": null
},
"entityType": {
"mapFrom": "lusidEntityType",
"setTo": null
},
"instrumentAsAt": {
"mapFrom": "lusidEntityAsAt",
"setTo": null
},
"instrumentEffectiveAt": {
"mapFrom": "lusidEntityEffectiveAt",
"setTo": null
},
"instrumentScope": {
"mapFrom": "lusidEntityScope",
"setTo": null
},
"identifierType": {
"mapFrom": "lusidEntityIdentifierKey",
"setTo": null
},
"identifierValue": {
"mapFrom": "lusidEntityIdentifierValue",
"setTo": null
},
"instrumentName": {
"mapFrom": "lusidEntityDisplayName",
"setTo": null
},
"entityUniqueId": {
"mapFrom": "lusidEntityUniqueId",
"setTo": null
},
"severity": {
"mapFrom": "severity",
"setTo": null
},
"countRuleBreaches": {
"mapFrom": "countRuleBreaches",
"setTo": null
},
"errorDetail": {
"mapFrom": "errorDetail",
"setTo": null
}
},
"resultMatchingPattern": null,
"initialTrigger": null
}
],
"workerTimeout": null
}
}
],
"transitions": [
{
"fromState": "Pending",
"toState": "InProgress",
"trigger": "start",
"action": "run-checks"
},
{
"fromState": "InProgress",
"toState": "Complete",
"trigger": "no_exceptions"
},
{
"fromState": "InProgress",
"toState": "ExceptionManagement",
"trigger": "exceptions"
},
{
"fromState": "ExceptionManagement",
"toState": "Complete",
"trigger": "resolve",
"guard": "childTasks all (state eq 'Resolved')"
},
{
"fromState": "InProgress",
"toState": "Error",
"trigger": "error"
}
]
}
'def map_from(field):
return lwm.FieldMapping(map_from=field, set_to=None)
task_definitions_api.create_task_definition(
create_task_definition_request=lwm.CreateTaskDefinitionRequest(
id=lwm.ResourceId(scope="Finbourne-Examples", code="DQ-Check-CheckInstruments"),
display_name="DQ Check Instruments",
description="Runs data quality checks for instruments.",
states=[lwm.TaskStateDefinition(name=n, display_name=n, description=n) for n in ["Pending", "InProgress", "ExceptionManagement", "Complete", "Error"]],
field_schema=[
lwm.TaskFieldDefinition(name=n, type=t, display_name=d) for n, t, d in [
("checkDefinitionScope", "String", "CD Scope"),
("checkDefinitionCode", "String", "CD Code"),
("instrumentScope", "String", "Instrument Scope"),
("selectorAttribute", "String", "Selector Attribute"),
("selectorValue", "String", "Selector Value"),
("preferredIdentifier", "String", "Preferred Identifier"),
("ruleBreachLimit", "Decimal", "Rule Breach Limit"),
]
],
initial_state=lwm.InitialState(name="Pending", required_fields=[]),
triggers=[lwm.TransitionTriggerDefinition(name=n, trigger=lwm.TriggerSchema(type="External")) for n in ["start", "exceptions", "no_exceptions", "resolve", "error"]],
actions=[
lwm.ActionDefinition(
name="run-checks",
action_details=lwm.ActionDetails(
lwm.RunWorkerAction(
type="RunWorker",
worker_id=lwm.ResourceId(scope="default", code="LusidEntityDataQuality"),
worker_parameters={
"checkDefinitionScope": map_from("checkDefinitionScope"),
"checkDefinitionCode": map_from("checkDefinitionCode"),
"dataSetScope": map_from("instrumentScope"),
"selectorAttribute": map_from("selectorAttribute"),
"selectorValue": map_from("selectorValue"),
"returnIdentifierKey": map_from("preferredIdentifier"),
"limitIndividualBreachesPerRule": map_from("ruleBreachLimit"),
},
worker_status_triggers=lwm.WorkerStatusTriggers(
started=None,
completed_with_results="exceptions",
completed_no_results="no_exceptions",
failed_to_start="error",
failed_to_complete="error"
),
child_task_configurations=[
lwm.ResultantChildTaskConfiguration(
task_definition_id=lwm.ResourceId(scope="Finbourne-Examples", code="DQ-Check-exception"),
map_stacking_key_from="resultId",
child_task_fields={k: map_from(v) for k, v in [
("checkDefinitionScope", "checkDefinitionScope"),
("checkDefinitionCode", "checkDefinitionCode"),
("checkDefinitionDisplayName", "checkDefinitionDisplayName"),
("checkRunAsAt", "checkRunAsAt"),
("resultType", "resultType"),
("rulesetKey", "rulesetKey"),
("rulesetDisplayName", "rulesetDisplayName"),
("ruleKey", "ruleKey"),
("ruleDisplayName", "ruleDisplayName"),
("ruleDescription", "ruleDescription"),
("ruleFormula", "ruleFormula"),
("resultId", "resultId"),
("entityType", "lusidEntityType"),
("instrumentAsAt", "lusidEntityAsAt"),
("instrumentEffectiveAt", "lusidEntityEffectiveAt"),
("instrumentScope", "lusidEntityScope"),
("identifierType", "lusidEntityIdentifierKey"),
("identifierValue", "lusidEntityIdentifierValue"),
("instrumentName", "lusidEntityDisplayName"),
("entityUniqueId", "lusidEntityUniqueId"),
("severity", "severity"),
("countRuleBreaches", "countRuleBreaches"),
("errorDetail", "errorDetail"),
]},
)
]
)
)
)
],
transitions=[
lwm.TaskTransitionDefinition(from_state="Pending", to_state="InProgress", trigger="start", action="run-checks"),
lwm.TaskTransitionDefinition(from_state="InProgress", to_state="Complete", trigger="no_exceptions"),
lwm.TaskTransitionDefinition(from_state="InProgress", to_state="ExceptionManagement", trigger="exceptions"),
lwm.TaskTransitionDefinition(from_state="ExceptionManagement", to_state="Complete", trigger="resolve", guard="childTasks all (state eq 'Resolved')"),
lwm.TaskTransitionDefinition(from_state="InProgress", to_state="Error", trigger="error"),
]
)
)Similar to step 1, we can inspect a visualised diagram of the task definition via Workflow Service > Tasks in the LUSID web app:
Step 3: Kick off a task test run
Now that we’ve modelled our workflow as a parent and child task definition, we can test the workflow via the LUSID web app. To do this:
Sign in to the LUSID web app and navigate to Workflow Service > Tasks.
Click on Task definition and select the
DQ-Check-CheckInstrumentsdefinition we created in step 2.
Click Create task.

Specify the following in the form:
Target state:
InProgressCheck Definition Scope:
Finbourne-ExamplesCheck Definition Code:
DQ-Check-instrument-propertiesInstrument Scope:
Finbourne-ExamplesDQ-CheckPreferred Identifier:
Instrument/default/ClientInternalRule Breach Limit:
100

Click Create to run the DQ check task.
Step 4: Inspect the results
We can inspect the results of our run via the LUSID web app. To do this:
Navigate to Workflow Service > Dashboard and select the Descendants tab.

Click the Configuration icon.

Click Add column and select the task fields you want to monitor in the dashboard; we can choose any of the fields specified in our exception task definition, but we especially want the following:
resultTypeinstrumentNameidentifierTypeidentifierValuerulesetDisplayNameruleDescriptionseverity

With the relevant columns selected and reordered in the dashboard, we can see that LUSID has created an exception task for each instrument that is missing the
Instrument/Finbourne-Examples/Issuerproperty:
To investigate the exceptions, perform the following steps for each exception task:
Click Edit on the exception task and update the Target state to InProgress, indicating that you are in the process of investigating and rectifying the issue.

Rectify the missing data for the instrument using your preferred method. For this tutorial we’ll manually add the property via the LUSID web app:

Return to the Workflow Service > Dashboard screen and update the exception task Target state to Resolved.

Once we have updated every exception task to Resolved, the parent task automatically moves into the Complete state:
To check we’ve adequately addressed each data issue, we can create another DQ check task for the same set of data - this run should complete with zero exception tasks:
Next steps: Automate runs of the workflow
Now that we’re happy with our DQ check tasks, you could:
Set up an event handler to automate runs of a DQ check after an integration run
Add more rules to the check definition to check additional data points