Documentation Index

Fetch the complete documentation index at: https://support.lusid.com/llms.txt

Use this file to discover all available pages before exploring further.

Creating a custom dataflow integration from a template

Prev Next

LUSID’s Dataflow Editor gives you a visual, flowchart-style view of any custom dataflow integration, showing you:

  • Each processor in the pipeline

  • How data flows between them

  • How the whole dataflow is configured

The fastest way to get started is to import a ready-made JSON template, explore it in the editor, and make your first edit.

In this tutorial you will:

  1. Import a sample integration instance from a JSON template.

  2. Open the Dataflow Editor and walk through a five-processor pipeline, stage by stage.

  3. Understand how raw source data is ingested, reshaped, portfolio- and instrument-resolved, and finally written into LUSID as transactions.

  4. Change the run schedule to match your own requirements.

Step 1: Import the sample JSON

  1. Navigate to Integrations > Dashboard in the LUSID web app.

  2. Click Create instance and choose Integration Dataflow.

  3. Click the Import icon in the top right.

  4. Copy and paste the following JSON sample and click Import:

    {
      "integrationType": "integration-dataflow",
      "name": "Broker Transactions Daily Load",
      "description": "Reads a daily transactions CSV from the broker's SFTP, applies field mappings and derivations, resolves portfolios and instruments against LUSID, and upserts the transactions.",
      "enabled": true,
      "triggers": [
        { "type": "Time", "cronExpression": "0 0 2 * * ?", "timeZone": "America/New_York" }
      ],
      "details": {
        "maxErrors": 100,
        "maxTaskRetries": 2,
        "tasks": [
          {
            "name": "load-broker-transactions",
            "sourceTaskName": "",
            "sortKey": "",
            "inputFlowName": "load-broker-transactions",
            "processors": [
            {
                "processorType": "Ingest",
                "outputMode": "Stream",
                "inputFlows": [],
                "outputFlows": ["raw"],
                "config": {
                  "SourceType": "Drive",
                  "ParserType": "Csv",
                  "path": "/integrations/transactions/TRADES_{{COBDATE:yyyyMMdd}}.csv",
                  "recordPath": "ASSETS/ASSET",
                  "flattenDepth": 2,
                  "flattenSets": true,
                  "setDelimiter": "|",
                  "rowIdentifierField": "Transaction.ID",
                  "recordType": "TransactionId",
                  "batchSize": 500
                }
              },
              {
                "processorType": "MapTransform",
                "outputMode": "Stream",
                "inputFlows":  ["raw"],
                "outputFlows": ["mapped"],
                "config": {
                  "mappings": [
                    { "source": "TradeId",     "target": "TransactionId",   "transform": "TRIM" },
                    { "source": "AccountCode", "target": "PortfolioCode",   "transform": "TRIM" },
                    { "source": "Ticker",      "target": "Ticker",          "transform": "UPPERCASE" },
                    { "source": "ISIN",        "target": "ISIN",            "transform": "UPPERCASE" },
                    { "source": "Currency",    "target": "Currency",        "transform": "UPPERCASE" },
                    { "source": "Side",        "target": "Side",            "transform": "UPPERCASE" },
                    { "source": "TradeDate",   "target": "TransactionDate" },
                    { "source": "SettleDate",  "target": "SettlementDate" },
                    { "source": "Quantity",    "target": "Units" },
                    { "source": "UnitPrice",   "target": "Price" }
                  ],
                  "ValueSources": [
                    {
                      "name": "TotalConsideration",
                      "mathematicalExpression": "Units * Price",
                      "defaultValue": "0"
                    }
                  ],
    
                  "recordTypeMappings": [
                    { "recordType": "CashTransaction", "condition": "InstrumentType == 'CASH'" },
                    { "recordType": "Trade",           "condition": "InstrumentType != 'CASH'" },
                    { "recordType": "Filtered",        "condition": "Quantity == '0'" }
                  ],
    
                  "ConditionalValueSources": [
                    {
                      "name": "LusidType",
                      "logicalExpressions": [
                        { "when": "Side == 'BUY'  && RecordType == 'Trade'", "value": "Buy" },
                        { "when": "Side == 'SELL' && RecordType == 'Trade'", "value": "Sell" },
                        { "when": "RecordType == 'CashTransaction' && Side == 'CREDIT'", "value": "FundsIn" },
                        { "when": "RecordType == 'CashTransaction' && Side == 'DEBIT'",  "value": "FundsOut" }
                      ],
                      "defaultValue": "Buy"
                    }
                  ]
                }
              },
    
              {
                "processorType": "PortfolioResolver",
                "outputMode": "Stream",
                "inputFlows":  ["mapped"],
                "outputFlows": ["resolved_portfolios", "unresolved_portfolios"],
                "config": {
                  "resolutionMode": "Direct",
                  "portfolioScope": "Finbourne-Examples",
                  "portfolioCodeSource": "PortfolioCode",
                  "resolvedScopeField":  "_ResolvedPortfolioScope",
                  "resolvedCodeField":   "_ResolvedPortfolioCode",
                  "resolvedOutputKey":   "resolved_portfolios",
                  "unresolvedOutputKey": "unresolved_portfolios",
                  "batchSize": 200,
                  "sortKey": "PortfolioCode"
                }
              },
    
              {
                "processorType": "InstrumentResolver",
                "outputMode": "Stream",
                "inputFlows":  ["resolved_portfolios"],
                "outputFlows": ["resolved_instruments", "unresolved_instruments"],
                "config": {
                  "identifierHierarchy": [
                    { "type": "Isin",     "source": "ISIN" },
                    { "type": "Cusip",    "source": "CUSIP" },
                    { "type": "Sedol",    "source": "SEDOL" },
                    { "type": "Currency", "source": "Currency" }
                  ],
                  "resolvedLuidField": "_ResolvedLuid",
                  "resolvedIdentifierTypeField": "_ResolvedIdentifierType",
                  "resolvedOutputKey":   "resolved_instruments",
                  "unresolvedOutputKey": "unresolved_instruments",
                  "searchScope": "default",
                  "batchSize": 200,
                  "sortKey": "PortfolioCode"
                }
              },
    
              {
                "processorType": "UpsertTransaction",
                "outputMode": "Stream",
                "inputFlows":  ["resolved_instruments"],
                "outputFlows": [],
                "config": {
                  "scope": "Finbourne-Examples",
                  "portfolioCodeSource": "_ResolvedPortfolioCode",
                  "transactionIdSource": "TransactionId",
                  "typeSource": "LusidType",
                  "transactionDateSource": "TransactionDate",
                  "transactionDateFormat": "yyyy-MM-dd",
                  "settlementDateSource":  "SettlementDate",
                  "settlementDateFormat":  "yyyy-MM-dd",
                  "dateTimeZone": "America/New_York",
                  "unitsSource": "Units",
                  "priceSource": "Price",
                  "totalConsiderationAmountSource":  "TotalConsideration",
                  "totalConsiderationCurrencySource": "Currency",
                  "source": "Finbourne-Examples",
    
                  "instrumentIdentifierType":   "LusidInstrumentId",
                  "instrumentIdentifierSource": "_ResolvedLuid",
    
                  "identifierMappings": [
                    { "lusidIdentifierType": "Isin",  "sourceField": "ISIN" },
                    { "lusidIdentifierType": "Cusip", "sourceField": "CUSIP" },
                    { "lusidIdentifierType": "Sedol", "sourceField": "SEDOL" }
                  ],
    
                  "propertyMappings": [
                    { "Unit": null,  "DataType": "String",  "PropertyKey": "Transaction/Finbourne-Examples/TraderId",   "SourceField": "TraderId" },
                    { "Unit": null,  "DataType": "String",  "PropertyKey": "Transaction/Finbourne-Examples/Strategy",   "SourceField": "Strategy" },
                    { "Unit": "USD", "DataType": "Decimal", "PropertyKey": "Transaction/Finbourne-Examples/Commission", "SourceField": "Commission" }
                  ],
    
                  "batchSize": 200,
                  "sortKey": "PortfolioCode",
                  "continueAfterSink": false
                }
              }
    
            ]
          }
        ]
      },
      "postProcessTasks": []
    }

The dashboard displays the pipeline configuration from the JSON file as a flowchart, with each box representing a processor. Arrows show the direction data flows between the processors as output flows. Read more on output flows.

Step 2: Customise the pipeline

Ingest processor

The Ingest processor is where the dataflow pipeline begins. In this tutorial, we’ll use it to read a source file from LUSID Drive and turn the raw rows into records that subsequent processors can work with.

Select the Ingest processor to open its configuration panel in the editor:

All records flow out of the Ingest processor as an output flow called raw. You can select the arrow between Ingest and Map Transform to view the output flow.

Output mode is set to Stream as we don’t need to write any records to storage at this point; learn more.

Data source

Note the following:

  • Source Type is Drive for this tutorial, but you can change this to Api, Sftp, and more to suit your use case.

  • The integration will replace the Path token {{COBDATE:yyyyMMdd}} at runtime with the run date - so on 29 May 2025 the file read would be TRADES_20250529.csv  . This means the integration always uses the file for the current run date without requiring manual path changes.

Parser

Note the following:

  • Parser type is set to Csv with a pipe (|) delimiter and header row.

  • Validate row count is enabled - this means the integration expects a trailer row matching the pattern TRAILER|<count> and uses it to verify the row count in the file.

Output

Record type is set to BrokerTransaction. This means that every record that leaves this processor carries that label which downstream processors can use for routing and filtering.

Map Transform

The broker’s CSV we want to import data from looks like this:

TradeId|AccountCode|Ticker|ISIN|CUSIP|SEDOL|InstrumentType|Currency|Side|TradeDate|SettleDate|Quantity|UnitPrice|TraderId|Strategy|Commission
TXN-001|PORT-EQ-001|AAPL|US0378331005|037833100|2046251|EQUITY|USD|BUY|2025-05-29|2025-06-02|500|189.25|TR-42|Growth|12.50
TXN-002|PORT-EQ-001|MSFT|US5949181045|594918104|B7TL820|EQUITY|USD|SELL|2025-05-29|2025-06-02|200|415.80|TR-42|Growth|8.20
TXN-003|PORT-FI-001|TSLA|US88160R1014|88160R101|BVYMTL8|EQUITY|USD|BUY|2025-05-29|2025-06-02|100|177.90|TR-19|Momentum|4.75
TXN-004|PORT-FI-001||||||CASH|GBP|CREDIT|2025-05-29|2025-05-29|50000|1|||
TXN-005|PORT-EQ-001|NOVO B|DK0062498333|N/A|B3KDMN0|EQUITY|DKK|BUY|2025-05-29|2025-06-02|0|850.00|TR-42|Growth|0
TRAILER|5

The CSV uses its own column names and conventions; Map Transform shapes each record into a form LUSID and downstream processors expect - without affecting the source file. The records then flow out of the processor as the output flow mapped.

To interact with mappings, select Map Transform > Mapping builder:

Field mappings

Field mappings allow you to rename the column names or transform data from the CSV. Our imported JSON template prepopulates these mappings, for example:

  • TradeId becomes TransactionId

  • AccountCode becomes PortfolioCode

  • Ticker values are transformed into uppercase

Value sources

Value sources enable you to specify how to extract and transform data from the columns in the file. Learn more about value sources.

For example:

  • TotalConsideration derives a new field by multiplying Units by Price.

  • LusidType derives the LUSID transaction type from the combination of Side and RecordType, so:

    • A BUY trade becomes Buy

    • A SELL trade becomes Sell

    • A cash credit becomes FundsIn

For both of the examples, no equivalent field exists in the source file - it’s computed entirely within the dataflow pipeline and passed to UpsertTransaction later as the total consideration amount and transaction type respectively.

Record types

Record type mappings allow you to tag records based on conditions (using LUSID filtering syntax) you specify, for example:

  • Records where InstrumentType eq ‘CASH‘ are tagged CashTransaction; all others become Trade

  • Records where Quantity eq ‘0’ are tagged Filtered so downstream processors can skip the record

Portfolio Resolver

The Portfolio Resolver processor tells LUSID which portfolio a transaction should belong to.

Before we can upsert a transaction to LUSID, the processor matches each record’s PortfolioCode field (renamed from AccountCode by the Map Transform processor) to an existing LUSID portfolio.

Resolution

In this tutorial, the Resolution mode is set to Direct.

  • We can set the Portfolio Scope to the static value Finbourne-Examples.

  • The processor uses the PortfolioCode value to look up the LUSID portfolio code for each record.

Note

The processor caches results for the current integration run so lookups for the same portfolio aren’t repeated.

Output fields and flows

If the processor successfully matches a record to a LUSID portfolio, it’s considered a resolved record. The processor attaches the fields _ResolvedPortfolioScope and _ResolvedPortfolioCode to each resolved record for the Upsert Transaction processor to use.

Conversely, a record without a portfolio match is considered unresolved and, for this tutorial, not routed to another processor.

  • Resolved records flow out of the processor as the output flow resolved_portfolios

  • Unresolved records flow out of the processor as the output flow unresolved_portfolios

Instrument Resolver

The Instrument Resolver processor tells LUSID which instrument each transaction pertains to.

For this tutorial, we want the processor to try to uniquely resolve each record to an ordered hierarchy of instrument identifiers in LUSID:

  1. ISIN

  2. CUSIP

  3. SEDOL

  4. Currency

Note

We’ll set Currency as the final fallback identifier so that cash records - which carry no other identifiers - will still resolve (as long as a currency instrument exists in LUSID).

Similarly to the Portfolio Resolver, our output splits into resolved and unresolved records:

  • Successfully resolved records flow out of the processor as the output flow resolved_instruments with the following fields attached:

    • _ResolvedLuid

    • _ResolvedIdentifierType

  • Unresolved records flow out of the processor as the output flow unresolved_instruments and are not routed to another processor

Upsert Transaction

Finally, the Upsert Transaction processor writes the fully resolved and enriched records into LUSID as transactions.

For this tutorial, the processor:

  • Loads transactions into the Finbourne-Examples Scope

  • Sources the Instrument identifier from _ResolvedLuid (as configured in the Instrument Resolver)

  • Sources the Transaction type from LusidType (as configured in Map Transform)

  • Treats dates in the records as the specified Date timezone and converts the value to UTC before loading into LUSID

  • Writes three custom properties onto each transaction:

    • TraderId

    • Strategy

    • Commission

    Note that each property definition must already exist in your LUSID domain for this to succeed.

Step 3: Update the run schedule

The imported integration is configured to run every day at 2am America/New York time. You can change the schedule to a time that suits your requirements:

  1. Select the Execution settings icon in the top-right of the editor.

  2. Update the Schedule cron expression and time zone to your preferred schedule. For example, to run at 7am London time, enter 0 0 7 * * ? and set the time zone to Europe/London.

  3. Click Save.

The trigger is now set to your preferred schedule. LUSID will kick off subsequent runs of the integration instance using this schedule.

Next steps

  • Point this integration at your own source file: update the path value on the Ingest processor to match the location of your CSV in LUSID Drive (or another source, such as your own SFTP server).

  • Route unresolved instruments to an Upsert Instrument processor instead of dropping them.

  • Monitor a run and inspect the logs.