This reference article describes the processor types available in the dataflows editor.
Each processor is a single step in a pipeline. For an overview of how processors fit together in pipelines, see How custom dataflow integrations work.
Common settings
Every processor shares the following settings, regardless of type:
Setting | Description |
|---|---|
Output mode | What happens to records as they leave this processor. See Output modes. |
Input record type | The output record types this processor takes in records from |
Output record type | Records flow out of processors as this named record type |
Batch size | The maximum number of records the output flow can hold before upstream processors pause |
Ingest
The Ingest processor reads data from a source, parses it into records, and passes those records to the next processor. LUSID creates a lineage record for each file it processes.
Ingest is always the first processor in a pipeline.
Setting | Description |
|---|---|
Source type | Where to read data from: |
Parser type | The format of the source data: |
Path / pattern and Root path | The file path, URL, queue name, or collection (depending on the source type) to read from |
Boundary field | The field on upstream records whose value change triggers a new call to the data source. When specified, the processor buffers and sorts upstream records by the boundary field. |
For example, you could include the following segments in your imported JSON template to automatically populate the processor settings:
{
"ProcessorType": "Ingest",
"Sequence": 1,
"OutputFlows": { "default": "ingested" },
"Config": {
"sourceType": "Drive",
"parserType": "Csv",
"path": "/integrations/jpm-imos/positions/*.csv",
"delimiter": "|",
"recordType": "ValuedHolding",
"batchSize": 500
}
}{
"ProcessorType": "Ingest",
"Sequence": 2,
"InputFlows": { "default": "portfolios" },
"OutputFlows": { "default": "positions" },
"Config": {
"sourceType": "Api",
"parserType": "Json",
"path": "https://api.custodian.com/v1/positions",
"auth": { "type": "bearer", "token": "SECRET:custodian-token" },
"boundaryField": "AccountId",
"inputDerivedQueryParams": [
{ "paramName": "account", "sourceField": "AccountId" }
]
}
}Map transform
The Map transform processor reshapes records:
Renames fields
Applies string transforms
Tags records with a type
Expands a single record into multiple records
Field mappings
Rules for renaming or transforming fields. Each rule specifies a source field, target field, and an optional transform function.
Setting | Description | |
|---|---|---|
Source field | The source file column you want to apply the transformation to | |
Target field | A field name for the transformed data | |
Transform | None | Simply renames the field without transforming any data |
Uppercase | Sets all field values to UPPERCASE | |
Lowercase | Sets all field values to lowercase | |
Trim | Removes surrounding whitespace from a field value, for example | |
Strip from plus | Removes everything from the first | |
Value maps
A list of key-value pairs for mapping specific values, which you can then reference in your value sources. Each list item should contain:
A unique reference Map name
A list of key-value pairs
For example, you could import the following mapping JSON to automatically populate the value maps settings:
{
"valueMaps": [
{
"map": {
"BUY": "Buy",
"SELL": "Sell",
"DIVIDEND": "StockDividend"
},
"name": "TransactionTypeMap"
},
{
"map": {
"Monthly": "1M",
"Quarterly": "3M",
"Semi-Annual": "6M",
"Annual": "1Y"
},
"name": "FrequencyMap"
}
]
}
Value sources
Rules for deriving new fields; supports taking the first non-empty value from a list, concatenating fields, setting default values, and mathematical expressions.
Setting | Description | |
|---|---|---|
Name | A name to use when referring to this value source | |
Source kind | Column | One or more columns from the source file that the value should be sourced from. Specifying a value conversion tells the processor how to combine multiple columns. |
Conditional | One or more rules containing complex logic for value transformations based on conditions. See conditional value sources. | |
Constant | Hardcode a specific value | |
Expression | A derivation formula that instructs the processor how to automatically calculate values from one or more fields. See supported operations. | |
MathematicalExpression | A formula that can use other value sources to output a value. See mathematical expression. | |
ValueMapLookup | The name of a value map to use for the value source | |
Best practice
Reuse value sources: Define value sources once and reference them multiple times.
Set default values for the processor to fallback on when it encounters unexpected values.
Reference: Date formats
One or more of the following date formats; the processor attempts to apply each format to the value in order until it’s successful:
yyyy-MM-dd(recommended)yyyy-MM-ddTHH:mm:ssZ(recommended)dd/MM/yyyyMM/dd/yyyyyyyy-MM-ddTHH:mm:ssdd-MMM-yyyyyyyyMMdd
Reference: Conditional value sources
Each rule should contain:
Condition: A LUSID filtering syntax expression to evaluate the value source against. It must use the format
<Value source> <Operator> <Specific value/source>.Supported operators
The integration framework supports the following operators in addition to the standard LUSID filtering syntax supported operators:
InMapKeys: Value is a Key in the defined value maps, for exampleTypeValue InMapKeys ValidTypesMapInMapValues: Value is a Value in the defined value maps, for exampleStatusValue InMapValues StatusMap
Result: The value to return if the condition is met; this can be any of the following:
A specific value
One of the other defined value sources

Reference: Column references
A list of columns from the source file that the value should be sourced from. Specifying a value conversion tells the processor how to combine multiple columns.
Reference: Value conversion
Specify how to combine multiple column references:
None: Takes the first Column references valueConcatenate: Joins multiple Column references values together using the specified SeparatorFirstNonEmpty: Uses the first non-empty Column references valueRegexExtraction: The processor applies a regex pattern to the value from either:The first column in Column references
If no Column references are specified, the first item listed in Value sources
Note
Regex patterns time out after 1 second; to avoid timeouts, use patterns that target specific locations (for example,
^ISIN:\\s*([A-Z0-9]{12})rather than.*ISIN.*).The framework only extracts the first parenthesised group in your pattern (if used).
Reference: Currency identifier
Check the box to add the prefix CCY_ to each value, for example USD → CCY_USD.
Reference: Separator
Defaults to a space (“ “).
The separator to use when Value conversion is set to Concatenate. For example, specifying _ as separator transforms the values FUND-001, US0378331005, 2025-03-10 → FUND-001_US0378331005_2025-03-10.
Reference: Mathematical expression
Specify a formula that can use other value sources to output a value. You can use any of the following in your formula:
Names of
ValueSourcesNumeric literals
+,-,*,/
Note
The framework evaluates strictly left-to-right - there is no operator precedence.
For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:
{
"ProcessorType": "MapTransform",
"Sequence": 2,
"InputFlows": { "default": "ingested" },
"OutputFlows": { "default": "mapped" },
"Config": {
"mappings": [
{ "source": "AccountNum", "target": "PortfolioCode", "transform": "TRIM" },
{ "source": "TickerSymbol", "target": "Ticker", "transform": "UPPERCASE" }
],
"recordTypeMappings": [
{ "recordType": "Cash", "condition": "InstrumentType == 'CASH'" },
{ "recordType": "Filtered", "condition": "Quantity == '0'" }
],
"recordExpansions": [
{
"when": "InstrumentType == 'FxForward'",
"tagField": "_Leg",
"emit": ["BuyLeg", "SellLeg"]
}
],
"ValueSources": [
{
"name": "Side",
"mathematicalExpression": "Units * 1",
"defaultValue": "Buy"
}
]
}
}Instrument resolver
The Instrument resolver processor looks up each record’s instrument in LUSID and adds the resolved LUID to the record.
Records flow out of the processor in one of two output flows:
One output flow for successfully resolved records
One output flow for records that could not be resolved
Setting | Description |
|---|---|
Identifier mode | Configure the processor to resolve against a single identifier type, or a prioritised fallback hierarchy - the processor uses the first identifier that resolves uniquely. |
Identifier type | The LUSID instrument identifier type to look up for each record. |
Identifier source | The field on the record containing the identifier value to look up in LUSID. |
Resolved LUID field | A name for the LUID field to attach to resolved records |
Resolved identifier type field | In |
Resolved output flow | The name of the output flow for successfully resolved records |
Unresolved output flow | The name of the output flow for records that could not be resolved |
Stamp fields | Additional LUSID instrument fields to read from LUSID and attach to resolved records |
Search scope | The LUSID scope to look up instruments in |
Sort key | The processor buffers and sorts records by this record field before upserting |
For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:
{
"ProcessorType": "InstrumentResolver",
"Sequence": 3,
"InputFlows": { "default": "mapped" },
"OutputFlows": { "resolved": "resolved", "unresolved": "to_create" },
"Config": {
"identifierHierarchy": [
{ "type": "Isin", "source": "ISIN" },
{ "type": "Cusip", "source": "CUSIP" },
{ "type": "Currency","source": "Currency" }
],
"stampFields": ["InstrumentType", "Name"],
"searchScope": "default",
"batchSize": 200
}
}Portfolio resolver
The Portfolio resolver processor looks up each record’s portfolio in LUSID and adds the resolved portfolio scope and code to the record.
Records flow out of the processor in one of two output flows:
One output flow for successfully resolved records
One output flow for records that could not be resolved
Setting | Description |
|---|---|
Resolution mode | Configure the processor to resolve against portfolios filtered by properties and a linked code, a portfolio group, or using each record’s portfolio code value. See resolution modes below. |
Portfolio code source | The field on the record containing the portfolio code value to look up in LUSID |
Resolved scope field | A name for the resolved portfolio scope field to attach to resolved records |
Resolved code field | A name for the resolved portfolio code field to attach to resolved records |
Resolved output flow | The name of the output flow for successfully resolved records |
Unresolved output flow | The name of the output flow for records that could not be resolved |
Resolution modes
Linked property: Fetches portfolios filtered by required properties, then matches records using a linked code property. Use this option when your portfolios have a property that maps to the source system’s account identifier.
Portfolio group: Resolves against portfolios in a specified portfolio group.
Direct: Resolves each record using its portfolio code value and a specified scope. Use this option when your records already contain the LUSID scope and code.
For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:
{
"ProcessorType": "PortfolioResolver",
"Sequence": 4,
"InputFlows": { "default": "resolved" },
"OutputFlows": { "resolved": "ready", "unresolved": "skipped" },
"Config": {
"resolutionMode": "LinkedProperty",
"portfolioCodeSource": "AccountNumber",
"linkedCodeProperty": "Portfolio/Finbourne-Examples/CustodianAccount",
"requiredProperties": [
{ "key": "Portfolio/Finbourne-Examples/Custodian", "value": "Acme" }
]
}
}Enrichment
The Enrichment processor adds extra fields to each record by looking up a value against mappings you specify in the processor configuration or an external API.
Setting | Description |
|---|---|
Enrichment source | Where to look up enrichment data: |
Enrichment key field | The name of a record field - the value of this record field is used as the lookup key |
On no match | What to do when no field match is found:
|
Default values | Field values to apply when On no match is set to |
Merge
The Merge processor combines multiple input flows into a single output flow.
Setting | Description |
|---|---|
Strategy | How to combine the input flows:
|
Merge key field | The name of the record field to sort by when Strategy is set to |
Query
The Query processor runs a SQL query against a Luminesce, Postgres or Snowflake database and emits each result row as a record. You can run this as either:
A standalone query with no input flows; or,
A dynamic query, where the processor receives records from an output flow and uses the field values from those records as query parameters
Setting | Description |
|---|---|
Database type |
|
Connection string | For non-Luminesce database types: the database connection string; supports |
SQL query | The parameterised SQL query to run |
Query name | A name to identify your query in logs |
Query timeout | How long to wait before the query times out |
Static parameters | One or more static key and value pairs to apply on every execution of the query |
Dynamic parameters | One or more parameters sourced from your upstream records; note the query runs once per batch of records |
Max rows | A safety limit on the number of rows the query can return; you can set the value to |
On empty input | What to do when no value exists for a dynamic parameter:
|
For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:
{
"ProcessorType": "Query",
"Sequence": 4,
"InputFlows": { "default": "trades" },
"OutputFlows": { "default": "instrument_meta" },
"Config": {
"databaseType": "Postgres",
"connectionString": "Host=ref.Finbourne-Examples.com;Database=ref;SslMode=Require",
"credentialsKey": "ref-pg",
"query": "SELECT isin, sector, country FROM reference.instruments WHERE isin = ANY(@isins)",
"dynamicParameters": [
{ "paramName": "isins", "sourceField": "ISIN", "distinct": true, "batchSize": 500 }
],
"recordType": "InstrumentMeta"
}
}Deduplication
More information is coming soon.
Data quality check
The Data quality check processor runs data quality rules you configure against each record. You can choose what happens when a rule fails:
Flag the record and continue
Stop the pipeline
Attach the errors to the record for downstream handling
Setting | Description |
|---|---|
Check type | The type of data quality validation to run |
Field | The record field this rule validates |
Fail mode | What to do when a rule fails:
|
Error message | A message to log if the rule fails |
Max value and Min value | Used for |
Allowed values | Used for |
Regex pattern | Used for |
For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:
{
"ProcessorType": "DQCheck",
"Sequence": 5,
"Config": {
"failMode": "flag",
"rules": [
{ "name": "isin-required", "check": "Required", "field": "ISIN", "when": "InstrumentType != 'CASH'" },
{ "name": "isin-format", "check": "Regex", "field": "ISIN", "pattern": "^[A-Z]{2}[A-Z0-9]{9}\\d$" },
{ "name": "qty-positive", "check": "GreaterThan", "field": "Quantity", "value": "0", "severity": "warning" },
{ "name": "ccy-allowed", "check": "In", "field": "Currency", "values": ["USD","EUR","GBP","JPY"] },
{ "name": "trade-unique", "check": "UniqueInBatch","field": "TradeId" },
{ "name": "trade-date", "check": "DateFormat", "field": "TradeDate", "format": "yyyyMMdd" }
]
}
}File export
The File export processor turns records into a file and writes the file to your specified destination. You can export files with the following formats:
CSV
JSON
XML
Fixed-width
Setting | Description |
|---|---|
Output format | The format of the output file: |
Include headers | When enabled and Output format is CSV, includes a header row in the file |
Delimiter | Used for CSV output files; the column delimiter |
Pretty print | When enabled, formats the JSON output file with indentation and line breaks for readability |
Column order | Used for CSV and fixed-width output files; an ordered list of field names to explicitly order the columns in the output file |
Destination type and Output path | Where to write the output file: |
Transform pipeline | Optionally compress the output before writing it to the destination: |
Continue after sink | When enabled, the processor passes the records to the next processor after exporting them as a file |
Record type filter | If specified, only process records with this record type attached |
Pass through unmatched | Passes records that do not match the Record type filter downstream unchanged (rather than dropping them) |
Transfer
The Transfer processor moves files from one location to another without parsing them. This can be useful for delivering files to an external system or encrypting files in transit.
LUSID keeps a log of each file the processor transfers.
Setting | Description |
|---|---|
Source type and Source path | Where to source the file from: |
Delete after transfer | When enabled, the processor deletes the file in the source location after it has been successfully transferred to the destination. If the processor is unable to delete the source file after transfer, it logs the failure as a warning. |
Destination type and Destination path | Where to move the file to: |
Upsert custom entities
The Upsert custom entities processor creates or updates custom entities in LUSID from records. If the custom entity type does not yet exist in your domain, LUSID creates it for you on the first run.
Setting | Description |
|---|---|
Entity type name | The custom entity type name |
Display name and Description | A friendly display name and description for the custom entity type |
Create type if not exists | When enabled, creates the custom entity type definition automatically on the first run |
Identifiers | At least one identifier for the custom entity, comprised of a static |
Field schema | The definitions for each field on the custom entity type; see Defining a custom entity type |
Field mappings | Where to source the values for each custom entity field |
Property mappings | LUSID properties to decorate onto custom entities with a value from your specified source field |
Continue after sink | When enabled, the processor passes the records to the next processor after upserting them as custom entities in LUSID |
Upsert holdings
The Upsert holdings processor creates or updates holdings in LUSID from records. You can process holdings in two ways:
Adjust holdings: Add to the existing portfolio holdings
Set holdings: Replace the existing portfolio holdings
Setting | Description |
|---|---|
Mode | How the holdings you upsert should behave with your existing holdings in LUSID: either |
Portfolio scope | The LUSID portfolio scope to write holdings into |
Portfolio code source | The record field to source the portfolio code from |
EffectiveAt source | The record field to source the effectiveAt datetime from |
EffectiveAt format | If the effectiveAt value is not ISO 8601-formatted, specify a date format (as listed in date formats) |
Instrument identifier type | The three-stage LUSID instrument identifier key, for example |
<Holdings field> source | The record field to populate each holding value from; read more on LUSID’s holdings fields |
Property mappings | Each property mapping configures the processor to decorate a LUSID property onto holdings with a value from your specified source field |
Continue after sink | When enabled, the processor passes the records to the next processor after upserting them as holdings in LUSID |
Aggregate duplicates by instrument | When enabled in |
Sort key | In |
Upsert instrument
The Upsert instrument processor creates or updates instrument definitions in LUSID from records. You can assign instrument types by condition, with a fallback type for records that don’t match a condition.
Setting | Description |
|---|---|
Identifier mapping source field | The record field or value source for the identifier |
LUSID identifier type | The LUSID instrument identifier type to populate with a value from the source field |
Name source field | The record field to use as the instrument name |
Default instrument type | A fallback instrument type for the processor to assign when no type condition matches |
Domestic currency source | The record field to use as the instrument’s domestic currency |
Property mappings | Each property mapping configures the processor to decorate a LUSID property onto instruments with a value from your specified source field |
Continue after sink | When enabled, the processor passes the records to the next processor after upserting them as instruments in LUSID |
Resolved LUID field | When Continue after sink is enabled, uses this name for the field containing the record’s resolved LUID |
Upsert legal entities
The Upsert legal entities processor creates or updates legal entities in LUSID from records. Before upserting, the processor deduplicates records within a batch by their idTypeScope.
Setting | Description |
|---|---|
Identifier scope | The |
Identifier type | The |
Identifier value source | The record field to use as the legal entity’s |
Display name source and Description source | The record fields to use as the legal entity’s display name and description respectively |
Additional identifiers | Additional identifiers to write alongside the primary identifier, as described here |
Property mappings | Each property mapping configures the processor to decorate a LUSID property onto legal entities with a value from your specified source field |
Continue after sink | When enabled, the processor passes the records to the next processor after upserting them as legal entities in LUSID |
Upsert quote
More information is coming soon.
Upsert relational dataset
The Upsert relational dataset processor creates or updates data points in a LUSID relational dataset from records. If the dataset definition does not yet exist in your domain, the processor creates it automatically on the first run.
Setting | Description |
|---|---|
Scope and Code | The relational dataset definition scope and code |
Display name and Description | A friendly display name and description for the dataset |
Create if not exists | When enabled, creates the relational dataset definition automatically on the first run |
Column definitions | The field schema for the relational dataset definition: Key columns become series identifiers; non-key columns become value fields. See Anatomy of a dataset. |
Entity type | The type of entity to associate each dataset record with; see applicableEntityTypes |
Column mappings | Where to source the values for each data point field; see How do I add records to a dataset? |
Data series scope | A series scope for each record; see Constructing the primary key |
EffectiveAt source | The record field to source the effectiveAt datetime from |
EffectiveAt default | If both EffectiveAt default and source are not set, defaults to the current UTC time |
Continue after sink | When enabled, the processor passes the records to the next processor after upserting them as dataset records in LUSID |
Upsert transaction
The Upsert transaction processor creates or updates transactions in LUSID from records. The processor groups and upserts records by portfolio.
Setting | Description |
|---|---|
Portfolio scope | The LUSID portfolio scope to write transactions into |
Portfolio code source | The record field to source the portfolio code from |
<Transaction field> source | The record field to populate each value on a transaction from; read more on LUSID’s transaction fields |
Property mappings | Each property mapping configures the processor to decorate a LUSID property onto transactions with a value from your specified source field |
Continue after sink | When enabled, the processor passes the records to the next processor after upserting them as transactions in LUSID |
Record type filter | If specified, only process records with this record type attached |
Pass through unmatched | Passes records that do not match the Record type filter downstream unchanged (rather than dropping them) |
Notification
The Notification processor sends an email notification at the end of a dataflow run. You can choose whether to send on success, failure, or always, and attach a summary of what the run processed.
Setting | Description |
|---|---|
SMTP server | The hostname, port, username, and password for your SMTP server |
Use SSL/TLS | Disable this for local testing tools only |
From | The sender email address |
To | Up to 10 direct recipient email addresses |
CC | Up to 10 carbon copy recipient email addresses |
BCC | Up to 10 blind carbon copy recipient email addresses |
Subject | The email subject line; you can use template tokens to make the text more meaningful |
Body | The email body as plain text or HTML; supports template tokens |
HTML email | When enabled, the processor sends the email as formatted HTML instead of plain text |
Send condition | When to send this notification based on the run state of the dataflow instance: |
Attach flow summary | When enabled, attaches a summary of the run’s record counts to the email as a CSV |
Workflow
The Workflow processor creates tasks in the Workflow Service from records - one task per record. You can filter which records trigger a task, assign tasks to specific users or teams, and map record fields onto task fields.
Setting | Description |
|---|---|
Task definition scope and Task definition code | The scope and code of the task definition to kick off a task from |
Initial state | Set the workflow state for new tasks |
Due date offset | Sets the task due date to the run date plus this number of days and hours |
Condition | A LUSID filtering syntax expression that evaluates to true or false; records that do not evaluate to true pass through the processor with no task created |
Record type filter | If specified, only process records with this record type attached |
Field mappings | Where to source the values for each task field |