Setting up a data quality control to check new pricing data for outliers

In this tutorial we'll see how to use the Workflow Service to create, test and automate a data quality check that does the following: 

  1. Loads some quotes into the LUSID Quote Store from a CSV file in Drive.

  2. Checks quotes for outliers with the 1.5xIQR rule, that is, quotes that fall below Q1 - 1.5 IQR or above Q3 + 1.5 IQR. An exception task is created for any outliers to be manually resolved.

  3. Creates an exception for any outlier quotes to be manually resolved.

Note: To complete this tutorial, you must have suitable access control permissions. This can most easily be achieved by assigning your LUSID user the built-in lusid-administrator role. This should already be the case if you are the domain owner.

For help with the Workflow Service terminology used in this tutorial, click here.

Step 1: Creating workers to perform operations in LUSID

Creating a worker that loads quotes into LUSID

The first thing we need to do is create the workers that the rest of the workflow is built around. Imagine we have a new CSV in Drive containing some quotes, including potential outliers:

ClientInternal,PriceDate,Value,Unit,ISIN
ClientInt1,2023-12-19T11:49:00Z,78.47247022,GBP,GB00BPQY8M80       
ClientInt1,2023-12-19T11:53:00Z,135.25498129,GBP,GB00BPQY8M80
ClientInt1,2023-12-19T11:58:00Z,1.98389754,GBP,GB00BPQY8M80
ClientInt1,2023-12-19T11:59:00Z,93.66053179,GBP,GB00BPQY8M80
ClientInt1,2023-12-19T12:01:00Z,100.56023855,GBP,GB00BPQY8M80
ClientInt2,2023-12-19T11:26:00Z,55.92974253,GBP,GB0033986497     
ClientInt2,2023-12-19T11:36:00Z,75.38473826,GBP,GB0033986497
ClientInt2,2023-12-19T11:39:00Z,49.64929302,GBP,GB0033986497
ClientInt2,2023-12-19T11:43:00Z,69.97645536,GBP,GB0033986497
ClientInt2,2023-12-19T11:44:00Z,66.97397658,GBP,GB0033986497
ClientInt3,2023-12-19T11:33:00Z,80.83720385,GBP,GB00BH4HKS39      
ClientInt3,2023-12-19T11:38:00Z,110.83674829,GBP,GB00BH4HKS39
ClientInt3,2023-12-19T12:02:00Z,108.42751963,GBP,GB00BH4HKS39
ClientInt3,2023-12-19T12:09:00Z,100000.88549987,GBP,GB00BH4HKS39
ClientInt3,2023-12-19T12:10:00Z,96.97357754,GBP,GB00BH4HKS39

Our first worker needs to extract quotes like those above from a folder in Drive and upsert them to the LUSID Quote Store. Creating a worker consists of two steps:

  1. Creating a Luminesce custom view that can interact with LUSID data.
    We can use Luminesce to create the following custom view that takes in a parameter of a folder to extract quotes from and a scope to upload each quote to in the Quote Store:

    @x = use Sys.Admin.SetupView
    --provider=Worker.QuoteOutliers.ImportQuotesFromCsv
    --description="A worker that extracts quotes from CSV files stored in a specified folder in drive and upserts them to the Quote Store"
    --parameters
    FolderToScan,Text,"/quotes",true
    ScopeForQuotes,Text,"Finbourne-Examples",true
    ----
    
    -- Define constants
    
    @@scope = select #PARAMETERVALUE(ScopeForQuotes);
    
    @@file_filter = select '.*.csv';
    @@folder_to_scan = select #PARAMETERVALUE(FolderToScan);
    
    --Extract new price file dropped into Drive
    
    @extract_prices = use Drive.Csv with @@folder_to_scan, @@file_filter
    --file={@@folder_to_scan}
    --folderFilter={@@file_filter}
    --addFileName
    enduse;
    
    --Transform pricing extract into LUSID structure
    
    @transform_prices =
    select
    ClientInternal as InstrumentId,
    'ClientInternal' as InstrumentIdType,
    @@scope as QuoteScope,
    'Price' as QuoteType,
    'Lusid' as Provider,
    'mid' as Field,
    Unit as Unit,
    Value as Value,
    PriceDate as QuoteEffectiveAt
    from @extract_prices;
    
    --Load prices into the LUSID quote store
    
    @to_write = select *
    from Lusid.Instrument.Quote.Writer
    where ToWrite = @transform_prices;
    
    --Return any errors
    
    select * from @to_write where WriteErrorCode != 0;
    enduse;
    
    select * from @x
  2. Passing in the name of the view to the CreateWorker API to wrap the functionality into the Workflow Service.
    We can now create a worker to wrap the functionality we've created in Worker.QuoteOutliers.ImportQuotesFromCsv into our workflow. To do so, we simply call the CreateWorker API, passing in the following:

    • A scope and code that together uniquely identify the worker.

    • A displayName and description for the worker.

    • A worker type of LuminesceView.

    • The name of the Luminesce custom view to be used.

    curl -X POST "https://<your-domain>.lusid.com/workflow/api/workers"
      -H "Content-Type: application/json-patch+json"
      -H "Authorization: Bearer <your-API-access-token>"
      -d "{
      "id": {
        "scope": "Finbourne-Examples",
        "code": "Csv-Import-Quotes"
      },
      "displayName": "Import quotes from CSV",
      "description": "Worker that extracts quotes from a folder of CSV files in Drive and upserts them to the Quote Store.",
      "workerConfiguration": {
        "type": "LuminesceView",
        "name": "Worker.QuoteOutliers.ImportQuotesFromCsv"
      }
    }"

Read more on how to create a worker.

Creating a worker that checks quotes are within tolerance

Once we have upserted our quotes to the Quote Store, we can configure our workflow to kick off a second worker which checks all quotes for a given portfolio are within our specified tolerances. 

For example, we might want to create a worker which returns outliers for a given instrument and date range with the 1.5xIQR rule, that is, quotes that fall below Q1 - 1.5 IQR or above Q3 + 1.5 IQR. 

  1. As with our other worker, we first need to create the following custom view that takes in a parameter of a start and end date for the range of quotes, and the portfolio we want to check quotes for:

    -- 1. Create view and set parameters
    @outlier_view = use Sys.Admin.SetupView
    --provider=Worker.QuoteOutliers.IqrCheck
    --parameters
    StartDate,DateTime,2023-06-01,true
    EndDate,DateTime,2024-01-01,true
    PfolioScope,Text,"Finbourne-Examples",true
    PfolioCode,Text,"PortfolioUk",true
    ----
    
    @@StartDate = select #PARAMETERVALUE(StartDate);
    @@EndDate = select #PARAMETERVALUE(EndDate);
    @@PfolioScope = select #PARAMETERVALUE(PfolioScope);
    @@PfolioCode = select #PARAMETERVALUE(PfolioCode);
    
    
    -- 2. Identify instruments in chosen portfolio
    @inst_data =  select
          i.ClientInternal
        from Lusid.Instrument.Equity i 
        join Lusid.Portfolio.Holding h
          on i.LusidInstrumentId = h.LusidInstrumentId
        where h.PortfolioScope = @@PfolioScope
          and h.PortfolioCode = @@PfolioCode;
    
    
    -- 3. Collect quotes for all instruments and generate time series
    @price_time_series = select
        i.ClientInternal,
        q.QuoteEffectiveAt as [PriceDate],
        q.Unit as [Currency],
        q.Value as [Price]
        from Lusid.Instrument.Quote q
        join @inst_data i
        on i.ClientInternal = q.InstrumentId
        where q.QuoteScope = 'Finbourne-Examples'
            and q.InstrumentIdType = 'ClientInternal'
            and q.QuoteType = 'Price'
            and q.QuoteEffectiveAt between @@StartDate and @@EndDate;      
    
    
    -- 4. Run IQR checks
    @iqr_data = select
        interquartile_range(price) * (1.5) as [iqr_x1_5],
        quantile(price, 0.25) as [q1],
        quantile(price, 0.75) as [q3]
        from @price_time_series;
    
    
    -- 5. Define upper and lower limit for our price check and print to console
    @@upper_limit = select (q3 + iqr_x1_5 ) from  @iqr_data;
    @@lower_limit = select (q1 - iqr_x1_5 ) from  @iqr_data;
    
    @@upper_limit_log = select print('Upper limit for outlier check: {X:00000} ', '', 'Logs', @@upper_limit);
    @@lower_limit_log = select print('Lower limit for outlier check: {X:00000} ', '', 'Logs', @@lower_limit);
    
    
    -- 6. Return all outlier prices between two dates for a portfolio
    select
        PriceDate,
        ClientInternal,
        @@upper_limit as [UpperLimit],
        @@lower_limit as [LowerLimit],
        Price,
        'Outlier' as Result
    from @price_time_series
    where price not between @@lower_limit and @@upper_limit;
    
    enduse;
    
    select * from @outlier_view;
    
  2. Just like the previous worker, we can now create a worker to wrap the functionality we've created in Worker.QuoteOutliers.IqrCheck into our workflow by calling the CreateWorker API, passing in the following:

    • A scope and code that together uniquely identify the worker.

    • A displayName and description for the worker.

    • A worker type of LuminesceView.

    • The name of the Luminesce custom view to be used.

    curl -X POST "https://<your-domain>.lusid.com/workflow/api/workers"
      -H "Content-Type: application/json-patch+json"
      -H "Authorization: Bearer <your-API-access-token>"
      -d "{
      "id": {
        "scope": "Finbourne-Examples",
        "code": "Check-Quotes-For-Iqr-Outliers"
      },
      "displayName": "Quote Outliers IQR",
      "description": "Worker that checks quotes are within the 1.5xIQR rule and outputs outliers.",
      "workerConfiguration": {
        "type": "LuminesceView",
        "name": "Worker.QuoteOutliers.IqrCheck"
      }
    }"

Step 2: Modelling the workflow in a series of task definitions

Once we understand the functionality we are building our workflow around, we next need to map out the key parts of the workflow. Before doing anything else, we should know:

  • What states can a new task going through this workflow exist in?

  • What data does the workflow need to take in and pass between states?

  • Which states can a task transition between? What triggers each transition to occur, and what guard conditions must be met for the transition to succeed?

  • What action should be taken on completion of a state transition? Do we need to create any child task definitions?

Thinking about the answers to each of these questions, we can model the following workflow in a series of three task definitions:

  • An “import quotes from file” task definition to act as the parent task.

  • A “control outlier quotes” task definition which can be created as a child of the “import quotes from file” task.

  • An “exception” task definition which can be created as a child of the “control outlier quotes” task. Note we'll create the “exception” task definition before we create the “control outlier quotes" task definition so the former can be referenced within the call to create the latter.

“Import quotes from file” task definition

To create the “import quotes from file” parent task definition, we need to call the CreateTaskDefinition API, passing in the following: 

Note: Once a task definition has been created, it can be updated at any point using the UpdateTaskDefinition endpoint. 

curl -X POST
 "https://<your-domain>.lusid.com/workflow/api/taskdefinitions" 
  -H "Authorization: Bearer <your-API-access-token>"
  -H "Content-Type: application/json-patch+json" 
  -d "{
  "id": {
    "scope": "Finbourne-Examples",
    "code": "Import-Quotes-From-File"
  },
  "displayName": "Import quotes with data quality checks",
  "description": "A workflow for importing quotes from a file in Drive and checking for quote outliers for portfolio holdings",
  
  # Define the states a new task going through this workflow can exist in
  "states": [
    { "name": "Pending" },
    { "name": "ImportingQuotes" },
    { "name": "Error" },
    { "name": "InControl" },
    { "name": "Done" }
  ],
  
  # Define the task input parameters
  "fieldSchema": [
    {
      "name": "folder",
      "type": "String"
    },
    {
      "name": "quoteScope",
      "type": "String"
    },
    {
      "name": "portfolioScope",
      "type": "String"
    },
    {
      "name": "portfolioCode",
      "type": "String"
    },
    {
      "name": "iqrQuoteRangeStartDate",
      "type": "DateTime"
    },
    {
      "name": "iqrQuoteRangeEndDate",
      "type": "DateTime"
    }
  ],
  
  # Define the default state a task should enter upon creation and any required input parameters to successfully reach this state
  "initialState": {
    "name": "Pending",
    "requiredFields": [
      "quoteScope"
    ]
  },
  
  # Define any triggers which can cause a state transition to occur in this part of the workflow
  "triggers": [
    {
      "name": "start",
      "trigger": {
        "type": "External"
      }
    },
    {
      "name": "success",
      "trigger": {
        "type": "External"
      }
    },
    {
      "name": "failure",
      "trigger": {
        "type": "External"
      }
    },
    {
      "name": "complete",
      "trigger": {
        "type": "External"
      }
    }
  ],
  
  # Define all the state transitions which can occur in this part of the workflow, including for each transition:
  #   - The states the task should move from and to.
  #   - The trigger that causes the transition to occur.
  #   - Any guard conditions which must be met in order for the transition to complete.
  #   - Any actions which should be taken upon completion of the transition.
  "transitions": [
    {
      "fromState": "Pending",
      "toState": "ImportingQuotes",
      "trigger": "start",
      "action": "import-prices-worker"
    },
    {
      "fromState": "ImportingQuotes",
      "toState": "Error",
      "trigger": "failure"
    },
    {
      "fromState": "ImportingQuotes",
      "toState": "InControl",
      "trigger": "success",
      "action": "trigger-control-outliers-child-task"
    },
    {
      "fromState": "InControl",
      "toState": "Done",
      "trigger": "complete",
      "guard": "childTasks all (state eq 'Complete')"

    }
  ],
  
 # Define all the actions which can occur in the state transitions.
 #    Note: We can update our task definition at any time to add in any missing actions.

   "actions": [
  {
    "name": "import-prices-worker",
    "actionDetails" : {
      "type": "RunWorker",
      "workerId": {
        "scope": "Finbourne-Examples",
        "code": "Csv-Import-Quotes"
      },
      "workerStatusTriggers": {
        "failedToStart": "failure",
        "failedToComplete": "failure",
        "completedWithResults": "success",
        "completedNoResults": "success"
      },
      "workerParameters": {
        "FolderToScan": {
          "MapFrom": "folder",
          "SetTo": null
        },
        "ScopeForQuotes": {
          "MapFrom": "quoteScope",
          "SetTo": null
        }
      }
    }
  },
  {
    "name": "trigger-control-outliers-child-task",
    "actionDetails": {
      "type": "CreateChildTasks",
      "childTaskConfigurations": [
        {
          "taskDefinitionId": {
            "scope": "Finbourne-Examples",
            "code": "Control-Outlier-Quotes"
          },
          "childTaskFields": {
            "iqrQuoteRangeStartDate": {  "mapFrom": "iqrQuoteRangeStartDate"},
            "iqrQuoteRangeEndDate": {  "mapFrom": "iqrQuoteRangeEndDate"},
            "portfolioScope": {  "mapFrom": "portfolioScope"},
            "portfolioCode": {  "mapFrom": "portfolioCode"}
          },         
          "initialTrigger": "start"
        } 
      ]
    }
  }
  ]
}"

“Exception” task definition

To create the “exception” child task definition, we need to call the CreateTaskDefinition API, passing in the following: 

curl -X POST
 "https://<your-domain>.lusid.com/workflow/api/taskdefinitions" 
  -H "Authorization: Bearer <your-API-access-token>"
  -H "Content-Type: application/json-patch+json" 
  -d "{
  "id": {
    "scope": "Finbourne-Examples",
    "code": "Quote-Outliers-Exception"
  },
  "displayName": "Quote outlier exception task",
  "description": "A workflow for handling exceptions found in quote outliers check",
  
  # Define the states a new task going through this workflow can exist in
  "states": [
    { "name": "Pending" },
    { "name": "InProgress" },
    { "name": "Resolved" }
  ],
  
  # Define the task input parameters
  "fieldSchema": [
    {
      "name": "clientInternal",
      "type": "String"
    },
    {
      "name": "price",
      "type": "Decimal"
    },
    {
      "name": "lowerLimit",
      "type": "Decimal"
    },
    {
      "name": "upperLimit",
      "type": "Decimal"
    },
    {
      "name": "priceDate",
      "type": "DateTime"
    }
  ],
  
  # Define the default state a task should enter upon creation and any required input parameters to successfully reach this state
  "initialState": {
    "name": "Pending",
    "requiredFields": [
      "clientInternal"
    ]
  },
  
  # Define any triggers which can cause a state transition to occur in this part of the workflow
  "triggers": [
    {
      "name": "start",
      "trigger": {
        "type": "External"
      }
    },
    {
      "name": "resolve",
      "trigger": {
        "type": "External"
      }
    }
  ],
  
  # Define all the state transitions which can occur in this part of the workflow, including for each transition:
  #   - The states the task should move from and to.
  #   - The trigger that causes the transition to occur.
  #   - Any guard conditions which must be met in order for the transition to complete.
  #   - Any actions which should be taken upon completion of the transition.
  "transitions": [
    {
      "fromState": "Pending",
      "toState": "InProgress",
      "trigger": "start"
    },
    {
      "fromState": "InProgress",
      "toState": "Resolved",
      "trigger": "resolve",
      "action": "trigger-control-outliers-parent-task"
    }
  ],
  
 # Define all the actions which can occur in the state transitions.

   "actions": [
     {
       "name": "trigger-control-outliers-parent-task",
       "actionDetails" : {
         "type": "TriggerParentTask",
         "trigger": "resolved"
       }
     }
   ]
}"

“Control outlier quotes” task definition

To create the “control outlier quotes” child task definition, we need to call the CreateTaskDefinition API, passing in the following:

curl -X POST
 "https://<your-domain>.lusid.com/workflow/api/taskdefinitions" 
  -H "Authorization: Bearer <your-API-access-token>"
  -H "Content-Type: application/json-patch+json" 
  -d "{
  "id": {
    "scope": "Finbourne-Examples",
    "code": "control-outlier-quotes"
  },
  "displayName": "Check for quote outliers child task",
  "description": "A workflow for checking newly-loaded quotes are within the 1.5xIQR rule",
  
  # Define the states a new task going through this workflow can exist in
  "states": [
    { "name": "Pending" },
    { "name": "InProgress" },
    { "name": "Exceptions" },
    { "name": "Complete" }
  ],
  
  # Define the task input parameters
  "fieldSchema": [
    {
      "name": "portfolioScope",
      "type": "String"
    },
    {
      "name": "portfolioCode",
      "type": "String"
    },
    {
      "name": "iqrQuoteRangeStartDate",
      "type": "DateTime"
    },
    {
      "name": "iqrQuoteRangeEndDate",
      "type": "DateTime"
    }
  ],
  
  # Define the default state a task should enter upon creation and any required input parameters to successfully reach this state
  "initialState": {
    "name": "Pending",
    "requiredFields": [
      "portfolioScope",
      "portfolioCode"
    ]
  },
  
  # Define any triggers which can cause a state transition to occur in this part of the workflow
  "triggers": [
    {
      "name": "start",
      "trigger": {
        "type": "External"
      }
    },
    {
      "name": "noExceptions",
      "trigger": {
        "type": "External"
      }
    },
    {
      "name": "exceptionsFound",
      "trigger": {
        "type": "External"
      }
    },
    {
      "name": "resolved",
      "trigger": {
        "type": "External"
      }
    }
  ],
  
  # Define all the state transitions which can occur in this part of the workflow, including for each transition:
  #   - The states the task should move from and to.
  #   - The trigger that causes the transition to occur.
  #   - Any guard conditions which must be met in order for the transition to complete.
  #   - Any actions which should be taken upon completion of the transition.
  "transitions": [
    {
      "fromState": "Pending",
      "toState": "InProgress",
      "trigger": "start",
      "action": "quote-outliers-check-worker"

    },
    {
      "fromState": "InProgress",
      "toState": "Exceptions",
      "trigger": "exceptionsFound"
    },
    {
      "fromState": "Exceptions",
      "toState": "Complete",
      "trigger": "resolved",
      "action": "trigger-import-quotes-parent-task",
      "guard": "childTasks all (state eq 'Resolved')"

    },
    {
      "fromState": "InProgress",
      "toState": "Complete",
      "trigger": "noExceptions",
      "action": "trigger-import-quotes-parent-task"
    }
  ],
  
 # Define all the actions which can occur in the state transitions.
   "actions": [
  {
    "name": "quote-outliers-check-worker",
    "actionDetails" : {
      "type": "RunWorker",
      "workerId": {
        "scope": "Finbourne-Examples",
        "code": "Check-Quotes-For-Iqr-Outliers"
      },
      "workerStatusTriggers": {
        "completedWithResults": "exceptionsFound",
        "completedNoResults": "noExceptions"
      },
      "workerParameters": {
        "StartDate": {
          "MapFrom": "iqrQuoteRangeStartDate",
          "SetTo": null
        },
        "EndDate": {
          "MapFrom": "iqrQuoteRangeEndDate",
          "SetTo": null
        },
         "PfolioScope": {
          "MapFrom": "portfolioScope",
          "SetTo": null
        },
         "PfolioCode": {
          "MapFrom": "portfolioCode",
          "SetTo": null
        }
      },
      "childTaskConfigurations": [
        {
          "taskDefinitionId": {
            "scope": "Finbourne-Examples",
            "code": "Quote-Outliers-Exception"  
          },
          "initialTrigger": "start",
          "childTaskFields": {
            "clientInternal": {  "mapFrom": "ClientInternal" },
            "lowerLimit": {  "mapFrom": "LowerLimit" },
            "upperLimit": {  "mapFrom": "UpperLimit" },
            "price": {  "mapFrom": "Price" },
            "priceDate": {  "mapFrom": "PriceDate" },
          }
        }
      ]
    }
  },
  {
    "name": "trigger-import-quotes-parent-task",
    "actionDetails": {
      "type": "TriggerParentTask",
      "trigger": "complete"
    } 
  }
]
}"

Read more on creating and updating task definitions.

Step 3: Kicking off a new task and examining the workflow in action

We can test the workflow manually at any time by uploading a CSV file to a folder in Drive and then creating a new task in the LUSID web app:

  1. Sign into the LUSID web app and navigate to Data Management > Drive. For this test, let's create a new folder quotes-examples and upsert the CSV from step 1.
     

  2. Navigate to the Workflow Service > Tasks dashboard.
     

  3. Select the task definition we created earlier and then click Create task.
     

  4. Provide values for the mandatory and optional input fields, before selecting Save to set the task in motion. For this test, let's provide the following values:
     
    Once created, we can use the dashboard to monitor the status of the task and any subsequent child tasks:
     
    We can see that two exception tasks are created, indicating that the workflow captured the outlier quotes from our example CSV.

  5. Click on the ID for each exception task to reveal more information; we can then manually rectify the outlier quote Price, before setting the Target state to Resolved:
     
    Once every exception task is set to Resolved, the entire workflow shows as completed:
     

  6. Navigate to Data Management > Quotes and select the Finbourne-Examples quote scope to check the workflow has functioned as expected. In the Quotes dashboard, we can see:

    • The prices from our CSV in Drive have been successfully loaded into the Quote Store.

    • The two outlier quotes were captured for manual update to retain the quality of our quote data; the data can now be used to perform accurate valuations.

Read more on creating, updating and monitoring tasks.

Next steps

With the task definitions and workers in place, you could choose to further improve the workflow by:

  • Subscribing to the FileCreated event in the Notification Service and setting up a webhook notification to automate the creation of an “Import quotes from file” parent task whenever a new file is loaded into Drive.

  • Adding a worker that handles outlier quotes for you.

  • Adding more data quality checks to the workflow.

  • Adding a worker to the end of the “import quotes from file” task definition that immediately performs a portfolio valuation and generates a report.

For more example workflows, visit our Github repo.

Appendix A: Troubleshooting any failed tasks

<coming soon>