Documentation Index

Fetch the complete documentation index at: https://support.lusid.com/llms.txt

Use this file to discover all available pages before exploring further.

Processor types for custom dataflow integrations

Prev Next

This reference article describes the processor types available in the dataflows editor.

Each processor is a single step in a pipeline. For an overview of how processors fit together in pipelines, see How custom dataflow integrations work.

Common settings

Every processor shares the following settings, regardless of type:

Setting

Description

Output mode

What happens to records as they leave this processor. See Output modes.

Input record type

The output record types this processor takes in records from

Output record type

Records flow out of processors as this named record type

Batch size

The maximum number of records the output flow can hold before upstream processors pause

Ingest

The Ingest processor reads data from a source, parses it into records, and passes those records to the next processor. LUSID creates a lineage record for each file it processes.

Ingest is always the first processor in a pipeline.

Setting

Description

Source type

Where to read data from: Drive, Sftp, Api, S3, ActiveMQ, MongoDB, Lusid, Postgres, Snowflake

Parser type

The format of the source data: Csv, Json, Xml, FixedWidth, Excel, Fix, or SwiftMt

Path / pattern and Root path

The file path, URL, queue name, or collection (depending on the source type) to read from

Boundary field

The field on upstream records whose value change triggers a new call to the data source. When specified, the processor buffers and sorts upstream records by the boundary field.

For example, you could include the following segments in your imported JSON template to automatically populate the processor settings:

{
  "ProcessorType": "Ingest",
  "Sequence": 1,
  "OutputFlows": { "default": "ingested" },
  "Config": {
    "sourceType": "Drive",
    "parserType": "Csv",
    "path": "/integrations/jpm-imos/positions/*.csv",
    "delimiter": "|",
    "recordType": "ValuedHolding",
    "batchSize": 500
  }
}
{
  "ProcessorType": "Ingest",
  "Sequence": 2,
  "InputFlows": { "default": "portfolios" },
  "OutputFlows": { "default": "positions" },
  "Config": {
    "sourceType": "Api",
    "parserType": "Json",
    "path": "https://api.custodian.com/v1/positions",
    "auth": { "type": "bearer", "token": "SECRET:custodian-token" },
    "boundaryField": "AccountId",
    "inputDerivedQueryParams": [
      { "paramName": "account", "sourceField": "AccountId" }
    ]
  }
}

Map transform

The Map transform processor reshapes records:

  • Renames fields

  • Applies string transforms

  • Tags records with a type

  • Expands a single record into multiple records

Field mappings

Rules for renaming or transforming fields. Each rule specifies a source field, target field, and an optional transform function.

Setting

Description

Source field

The source file column you want to apply the transformation to

Target field

A field name for the transformed data

Transform

None

Simply renames the field without transforming any data

Uppercase

Sets all field values to UPPERCASE

Lowercase

Sets all field values to lowercase

Trim

Removes surrounding whitespace from a field value, for example “  AAPL  â€œ → “AAPL“

Strip from plus

Removes everything from the first + character onwards

Value maps

A list of key-value pairs for mapping specific values, which you can then reference in your value sources. Each list item should contain:

  • A unique reference Map name

  • A list of key-value pairs

For example, you could import the following mapping JSON to automatically populate the value maps settings:

{
  "valueMaps": [
    {
      "map": {
        "BUY": "Buy",
        "SELL": "Sell",
        "DIVIDEND": "StockDividend"
      },
      "name": "TransactionTypeMap"
    },
    {
      "map": {
        "Monthly": "1M",
        "Quarterly": "3M",
        "Semi-Annual": "6M",
        "Annual": "1Y"
      },
      "name": "FrequencyMap"
    }
  ]
}

Value sources

Rules for deriving new fields; supports taking the first non-empty value from a list, concatenating fields, setting default values, and mathematical expressions.

Setting

Description

Name

A name to use when referring to this value source

Source kind

Column

One or more columns from the source file that the value should be sourced from. Specifying a value conversion tells the processor how to combine multiple columns.

Conditional

One or more rules containing complex logic for value transformations based on conditions. See conditional value sources.

Constant

Hardcode a specific value

Expression

A derivation formula that instructs the processor how to automatically calculate values from one or more fields. See supported operations.

MathematicalExpression

A formula that can use other value sources to output a value. See mathematical expression.

ValueMapLookup

The name of a value map to use for the value source

Best practice

Reference: Date formats

One or more of the following date formats; the processor attempts to apply each format to the value in order until it’s successful:

  • yyyy-MM-dd (recommended)

  • yyyy-MM-ddTHH:mm:ssZ (recommended)

  • dd/MM/yyyy

  • MM/dd/yyyy

  • yyyy-MM-ddTHH:mm:ss

  • dd-MMM-yyyy

  • yyyyMMdd

Reference: Conditional value sources

Each rule should contain:

  • Condition: A LUSID filtering syntax expression to evaluate the value source against. It must use the format <Value source> <Operator> <Specific value/source>.

    Supported operators

    The integration framework supports the following operators in addition to the standard LUSID filtering syntax supported operators:

    • InMapKeys: Value is a Key in the defined value maps, for example TypeValue InMapKeys ValidTypesMap

    • InMapValues: Value is a Value in the defined value maps, for example StatusValue InMapValues StatusMap

  • Result: The value to return if the condition is met; this can be any of the following:

    • A specific value

    • One of the other defined value sources

Reference: Column references

A list of columns from the source file that the value should be sourced from. Specifying a value conversion tells the processor how to combine multiple columns.

Reference: Value conversion

Specify how to combine multiple column references:

  • None: Takes the first Column references value

  • Concatenate: Joins multiple Column references values together using the specified Separator

  • FirstNonEmpty: Uses the first non-empty Column references value

  • RegexExtraction: The processor applies a regex pattern to the value from either:

    • The first column in Column references

    • If no Column references are specified, the first item listed in Value sources

    Note

    • Regex patterns time out after 1 second; to avoid timeouts, use patterns that target specific locations (for example, ^ISIN:\\s*([A-Z0-9]{12}) rather than .*ISIN.*).

    • The framework only extracts the first parenthesised group in your pattern (if used).

Reference: Currency identifier

Check the box to add the prefix CCY_ to each value, for example USD → CCY_USD.

Reference: Separator

Defaults to a space (“ “).

The separator to use when Value conversion is set to Concatenate. For example, specifying _ as separator transforms the values FUND-001, US0378331005, 2025-03-10 →  FUND-001_US0378331005_2025-03-10.

Reference: Mathematical expression

Specify a formula that can use other value sources to output a value. You can use any of the following in your formula:

  • Names of ValueSources

  • Numeric literals

  • +, -, *, /

Note

The framework evaluates strictly left-to-right - there is no operator precedence.

For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:

{
  "ProcessorType": "MapTransform",
  "Sequence": 2,
  "InputFlows": { "default": "ingested" },
  "OutputFlows": { "default": "mapped" },
  "Config": {
    "mappings": [
      { "source": "AccountNum", "target": "PortfolioCode", "transform": "TRIM" },
      { "source": "TickerSymbol", "target": "Ticker", "transform": "UPPERCASE" }
    ],
    "recordTypeMappings": [
      { "recordType": "Cash", "condition": "InstrumentType == 'CASH'" },
      { "recordType": "Filtered", "condition": "Quantity == '0'" }
    ],
    "recordExpansions": [
      {
        "when": "InstrumentType == 'FxForward'",
        "tagField": "_Leg",
        "emit": ["BuyLeg", "SellLeg"]
      }
    ],
    "ValueSources": [
      {
        "name": "Side",
        "mathematicalExpression": "Units * 1",
        "defaultValue": "Buy"
      }
    ]
  }
}

Instrument resolver

The Instrument resolver processor looks up each record’s instrument in LUSID and adds the resolved LUID to the record.

Records flow out of the processor in one of two output flows:

  • One output flow for successfully resolved records

  • One output flow for records that could not be resolved

Setting

Description

Identifier mode

Configure the processor to resolve against a single identifier type, or a prioritised fallback hierarchy - the processor uses the first identifier that resolves uniquely.

Identifier type

The LUSID instrument identifier type to look up for each record.

Identifier source

The field on the record containing the identifier value to look up in LUSID.

Resolved LUID field

A name for the LUID field to attach to resolved records

Resolved identifier type field

In Hierarchy mode, a name for the field that records which identifier type matched

Resolved output flow

The name of the output flow for successfully resolved records

Unresolved output flow

The name of the output flow for records that could not be resolved

Stamp fields

Additional LUSID instrument fields to read from LUSID and attach to resolved records

Search scope

The LUSID scope to look up instruments in

Sort key

The processor buffers and sorts records by this record field before upserting

For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:

{
  "ProcessorType": "InstrumentResolver",
  "Sequence": 3,
  "InputFlows": { "default": "mapped" },
  "OutputFlows": { "resolved": "resolved", "unresolved": "to_create" },
  "Config": {
    "identifierHierarchy": [
      { "type": "Isin",     "source": "ISIN" },
      { "type": "Cusip",   "source": "CUSIP" },
      { "type": "Currency","source": "Currency" }
    ],
    "stampFields": ["InstrumentType", "Name"],
    "searchScope": "default",
    "batchSize": 200
  }
}

Portfolio resolver

The Portfolio resolver processor looks up each record’s portfolio in LUSID and adds the resolved portfolio scope and code to the record.

Records flow out of the processor in one of two output flows:

  • One output flow for successfully resolved records

  • One output flow for records that could not be resolved

Setting

Description

Resolution mode

Configure the processor to resolve against portfolios filtered by properties and a linked code, a portfolio group, or using each record’s portfolio code value. See resolution modes below.

Portfolio code source

The field on the record containing the portfolio code value to look up in LUSID

Resolved scope field

A name for the resolved portfolio scope field to attach to resolved records

Resolved code field

A name for the resolved portfolio code field to attach to resolved records

Resolved output flow

The name of the output flow for successfully resolved records

Unresolved output flow

The name of the output flow for records that could not be resolved

Resolution modes

  • Linked property: Fetches portfolios filtered by required properties, then matches records using a linked code property. Use this option when your portfolios have a property that maps to the source system’s account identifier.

  • Portfolio group: Resolves against portfolios in a specified portfolio group.

  • Direct: Resolves each record using its portfolio code value and a specified scope. Use this option when your records already contain the LUSID scope and code.

For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:

{
  "ProcessorType": "PortfolioResolver",
  "Sequence": 4,
  "InputFlows": { "default": "resolved" },
  "OutputFlows": { "resolved": "ready", "unresolved": "skipped" },
  "Config": {
    "resolutionMode": "LinkedProperty",
    "portfolioCodeSource": "AccountNumber",
    "linkedCodeProperty": "Portfolio/Finbourne-Examples/CustodianAccount",
    "requiredProperties": [
      { "key": "Portfolio/Finbourne-Examples/Custodian", "value": "Acme" }
    ]
  }
}

Enrichment

The Enrichment processor adds extra fields to each record by looking up a value against mappings you specify in the processor configuration or an external API.

Setting

Description

Enrichment source

Where to look up enrichment data: StaticMap or Api

Enrichment key field

The name of a record field - the value of this record field is used as the lookup key

On no match

What to do when no field match is found:

  • Ignore: Pass the record through the processor unchanged

  • Default: Apply default values to the record

  • Fail: Stop the pipeline

Default values

Field values to apply when On no match is set to Default

Merge

The Merge processor combines multiple input flows into a single output flow.

Setting

Description

Strategy

How to combine the input flows:

  • Concat: Passes records through as they arrive from an input

  • GroupByKey: Groups incoming records by the specified Merge key field, then outputs the records in order

Merge key field

The name of the record field to sort by when Strategy is set to GroupByKey

Query

The Query processor runs a SQL query against a Luminesce, Postgres or Snowflake database and emits each result row as a record. You can run this as either:

  • A standalone query with no input flows; or,

  • A dynamic query, where the processor receives records from an output flow and uses the field values from those records as query parameters

Setting

Description

Database type

Luminesce, Postgres, or Snowflake

Connection string

For non-Luminesce database types: the database connection string; supports SECRET: tokens for credentials

SQL query

The parameterised SQL query to run

Query name

A name to identify your query in logs

Query timeout

How long to wait before the query times out

Static parameters

One or more static key and value pairs to apply on every execution of the query

Dynamic parameters

One or more parameters sourced from your upstream records; note the query runs once per batch of records

Max rows

A safety limit on the number of rows the query can return; you can set the value to -1 to disable the limit

On empty input

What to do when no value exists for a dynamic parameter:

  • Skip: Log the skipped record and complete with no output

  • Error: Stop the pipeline

For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:

{
  "ProcessorType": "Query",
  "Sequence": 4,
  "InputFlows": { "default": "trades" },
  "OutputFlows": { "default": "instrument_meta" },
  "Config": {
    "databaseType": "Postgres",
    "connectionString": "Host=ref.Finbourne-Examples.com;Database=ref;SslMode=Require",
    "credentialsKey": "ref-pg",
    "query": "SELECT isin, sector, country FROM reference.instruments WHERE isin = ANY(@isins)",
    "dynamicParameters": [
      { "paramName": "isins", "sourceField": "ISIN", "distinct": true, "batchSize": 500 }
    ],
    "recordType": "InstrumentMeta"
  }
}

Deduplication

More information is coming soon.

Data quality check

The Data quality check processor runs data quality rules you configure against each record. You can choose what happens when a rule fails:

  • Flag the record and continue

  • Stop the pipeline

  • Attach the errors to the record for downstream handling

Setting

Description

Check type

The type of data quality validation to run

Field

The record field this rule validates

Fail mode

What to do when a rule fails:

  • Flag: Marks the record as filtered, log a warning, and pass it downstream

  • Reject: Stop the pipe

  • Report: Attach error details to the record in a _DQErrors field and pass the record downstream

Error message

A message to log if the rule fails

Max value and Min value

Used for Range checks; lower and upper bounds for the value

Allowed values

Used for InSet checks; enter one or more allowed values

Regex pattern

Used for Regex checks

For example, you could include the following segment in your imported JSON template to automatically populate the processor settings:

{
  "ProcessorType": "DQCheck",
  "Sequence": 5,
  "Config": {
    "failMode": "flag",
    "rules": [
      { "name": "isin-required", "check": "Required",    "field": "ISIN",      "when": "InstrumentType != 'CASH'" },
      { "name": "isin-format",   "check": "Regex",       "field": "ISIN",      "pattern": "^[A-Z]{2}[A-Z0-9]{9}\\d$" },
      { "name": "qty-positive",  "check": "GreaterThan", "field": "Quantity",  "value": "0", "severity": "warning" },
      { "name": "ccy-allowed",   "check": "In",          "field": "Currency",  "values": ["USD","EUR","GBP","JPY"] },
      { "name": "trade-unique",  "check": "UniqueInBatch","field": "TradeId" },
      { "name": "trade-date",    "check": "DateFormat",  "field": "TradeDate", "format": "yyyyMMdd" }
    ]
  }
}

File export

The File export processor turns records into a file and writes the file to your specified destination. You can export files with the following formats:

  • CSV

  • JSON

  • XML

  • Fixed-width

Setting

Description

Output format

The format of the output file: Csv, Json, Xml, or FixedWidth

Include headers

When enabled and Output format is CSV, includes a header row in the file

Delimiter

Used for CSV output files; the column delimiter

Pretty print

When enabled, formats the JSON output file with indentation and line breaks for readability

Column order

Used for CSV and fixed-width output files; an ordered list of field names to explicitly order the columns in the output file

Destination type and Output path

Where to write the output file: Drive, S3, Sftp, or Local

Transform pipeline

Optionally compress the output before writing it to the destination: Gzip, Zip, Pgp

Continue after sink

When enabled, the processor passes the records to the next processor after exporting them as a file

Record type filter

If specified, only process records with this record type attached

Pass through unmatched

Passes records that do not match the Record type filter downstream unchanged (rather than dropping them)

Transfer

The Transfer processor moves files from one location to another without parsing them. This can be useful for delivering files to an external system or encrypting files in transit.

LUSID keeps a log of each file the processor transfers.

Setting

Description

Source type and Source path

Where to source the file from: Drive, Sftp, S3, or Local

Delete after transfer

When enabled, the processor deletes the file in the source location after it has been successfully transferred to the destination. If the processor is unable to delete the source file after transfer, it logs the failure as a warning.

Destination type and Destination path

Where to move the file to: Drive, Sftp, S3, or Local

Upsert custom entities

The Upsert custom entities processor creates or updates custom entities in LUSID from records. If the custom entity type does not yet exist in your domain, LUSID creates it for you on the first run.

Setting

Description

Entity type name

The custom entity type name

Display name and Description

A friendly display name and description for the custom entity type

Create type if not exists

When enabled, creates the custom entity type definition automatically on the first run

Identifiers

At least one identifier for the custom entity, comprised of a static Type, Scope, and a value Source field; see Understanding user-defined identifiers

Field schema

The definitions for each field on the custom entity type; see Defining a custom entity type

Field mappings

Where to source the values for each custom entity field

Property mappings

LUSID properties to decorate onto custom entities with a value from your specified source field

Continue after sink

When enabled, the processor passes the records to the next processor after upserting them as custom entities in LUSID

Upsert holdings

The Upsert holdings processor creates or updates holdings in LUSID from records. You can process holdings in two ways:

  • Adjust holdings: Add to the existing portfolio holdings

  • Set holdings: Replace the existing portfolio holdings

Setting

Description

Mode

How the holdings you upsert should behave with your existing holdings in LUSID: either Adjust or Set. Read more on adjusting or setting holdings.

Portfolio scope

The LUSID portfolio scope to write holdings into

Portfolio code source

The record field to source the portfolio code from

EffectiveAt source

The record field to source the effectiveAt datetime from

EffectiveAt format

If the effectiveAt value is not ISO 8601-formatted, specify a date format (as listed in date formats)

Instrument identifier type

The three-stage LUSID instrument identifier key, for example Instrument/default/LusidInstrumentId

<Holdings field> source

The record field to populate each holding value from; read more on LUSID’s holdings fields

Property mappings

Each property mapping configures the processor to decorate a LUSID property onto holdings with a value from your specified source field

Continue after sink

When enabled, the processor passes the records to the next processor after upserting them as holdings in LUSID

Aggregate duplicates by instrument

When enabled in Set mode, the processor sums the units for duplicate instrument records to form a single holding

Sort key

In Set mode, the processor buffers and sorts records by this record field before upserting

Upsert instrument

The Upsert instrument processor creates or updates instrument definitions in LUSID from records. You can assign instrument types by condition, with a fallback type for records that don’t match a condition.

Setting

Description

Identifier mapping source field

The record field or value source for the identifier

LUSID identifier type

The LUSID instrument identifier type to populate with a value from the source field

Name source field

The record field to use as the instrument name

Default instrument type

A fallback instrument type for the processor to assign when no type condition matches

Domestic currency source

The record field to use as the instrument’s domestic currency

Property mappings

Each property mapping configures the processor to decorate a LUSID property onto instruments with a value from your specified source field

Continue after sink

When enabled, the processor passes the records to the next processor after upserting them as instruments in LUSID

Resolved LUID field

When Continue after sink is enabled, uses this name for the field containing the record’s resolved LUID

The Upsert legal entities processor creates or updates legal entities in LUSID from records. Before upserting, the processor deduplicates records within a batch by their idTypeScope.

Setting

Description

Identifier scope

The idTypeScope of the legal entity (the second stage in the 3-stage key); see Understanding legal entity identifiers

Identifier type

The idTypeCode of the legal entity (the third stage in the 3-stage key); see Understanding legal entity identifiers

Identifier value source

The record field to use as the legal entity’s code value; see Understanding legal entity identifiers

Display name source and Description source

The record fields to use as the legal entity’s display name and description respectively

Additional identifiers

Additional identifiers to write alongside the primary identifier, as described here

Property mappings

Each property mapping configures the processor to decorate a LUSID property onto legal entities with a value from your specified source field

Continue after sink

When enabled, the processor passes the records to the next processor after upserting them as legal entities in LUSID

Upsert quote

More information is coming soon.

Upsert relational dataset

The Upsert relational dataset processor creates or updates data points in a LUSID relational dataset from records. If the dataset definition does not yet exist in your domain, the processor creates it automatically on the first run.

Setting

Description

Scope and Code

The relational dataset definition scope and code

Display name and Description

A friendly display name and description for the dataset

Create if not exists

When enabled, creates the relational dataset definition automatically on the first run

Column definitions

The field schema for the relational dataset definition: Key columns become series identifiers; non-key columns become value fields. See Anatomy of a dataset.

Entity type

The type of entity to associate each dataset record with; see applicableEntityTypes

Column mappings

Where to source the values for each data point field; see How do I add records to a dataset?

Data series scope

A series scope for each record; see Constructing the primary key

EffectiveAt source

The record field to source the effectiveAt datetime from

EffectiveAt default

If both EffectiveAt default and source are not set, defaults to the current UTC time

Continue after sink

When enabled, the processor passes the records to the next processor after upserting them as dataset records in LUSID

Upsert transaction

The Upsert transaction processor creates or updates transactions in LUSID from records. The processor groups and upserts records by portfolio.

Setting

Description

Portfolio scope

The LUSID portfolio scope to write transactions into

Portfolio code source

The record field to source the portfolio code from

<Transaction field> source

The record field to populate each value on a transaction from; read more on LUSID’s transaction fields

Property mappings

Each property mapping configures the processor to decorate a LUSID property onto transactions with a value from your specified source field

Continue after sink

When enabled, the processor passes the records to the next processor after upserting them as transactions in LUSID

Record type filter

If specified, only process records with this record type attached

Pass through unmatched

Passes records that do not match the Record type filter downstream unchanged (rather than dropping them)

Notification

The Notification processor sends an email notification at the end of a dataflow run. You can choose whether to send on success, failure, or always, and attach a summary of what the run processed.

Setting

Description

SMTP server

The hostname, port, username, and password for your SMTP server

Use SSL/TLS

Disable this for local testing tools only

From

The sender email address

To

Up to 10 direct recipient email addresses

CC

Up to 10 carbon copy recipient email addresses

BCC

Up to 10 blind carbon copy recipient email addresses

Subject

The email subject line; you can use template tokens to make the text more meaningful

Body

The email body as plain text or HTML; supports template tokens

HTML email

When enabled, the processor sends the email as formatted HTML instead of plain text

Send condition

When to send this notification based on the run state of the dataflow instance: Always, OnSuccess, or OnFailure

Attach flow summary

When enabled, attaches a summary of the run’s record counts to the email as a CSV

Workflow

The Workflow processor creates tasks in the Workflow Service from records - one task per record. You can filter which records trigger a task, assign tasks to specific users or teams, and map record fields onto task fields.

Setting

Description

Task definition scope and Task definition code

The scope and code of the task definition to kick off a task from

Initial state

Set the workflow state for new tasks

Due date offset

Sets the task due date to the run date plus this number of days and hours

Condition

A LUSID filtering syntax expression that evaluates to true or false; records that do not evaluate to true pass through the processor with no task created

Record type filter

If specified, only process records with this record type attached

Field mappings

Where to source the values for each task field