LUSID empowers you to build custom dataflow integrations - a flexible, configuration-driven way to load data from third-party sources into LUSID.
Rather than writing custom code for each integration, you describe what you want to happen in a JSON document. LUSID reads the JSON and runs the dataflow pipeline for you, handling:
Ingestion
Transformation
Instrument resolution
Data quality checks
Loading into LUSID
This article explains the key concepts you should understand before configuring or modifying a custom dataflow integration.
Integration instance
You create each custom dataflow integration as an integration instance. The instance controls when the integration you’ve configured runs, whether it’s currently active, and what happens after the run finishes.
Custom dataflow integration instances have four key parts:
Dataflow definition: A configuration file that defines what data to process and how
Enabled status: An on/off switch - when set to false, triggers do not kick off runs of the instance
Triggers: One or more schedules that automate runs of the integration instance
Post-process tasks: After the run completes, you can:
Chain integrations together
Send a notification
Kick off a LUSID workflow
Dataflow definition
Each custom dataflow integration instance has a dataflow definition - the JSON file that configures what the integration does.
A dataflow definition is made up of one or more tasks that execute in an ordered sequence. Every task contains a pipeline of processors that also run in order. In other words:
Raw data enters at one end.
The data passes through a series of processors.
Processed data emerges, ready to be loaded into LUSID.

You can control how a dataflow behaves in the event of a failure using the following fields:
MaxErrors: The number of errors at record-level to tolerate before stopping a run.MaxTaskRetries: How many times to retry a failing task.
Processors
A processor is a single step in a task’s pipeline, for example ingesting or transforming some data.
Processors read from and write to flows that pass records from one processor to the next using matching names. For example:
{ "ProcessorType": "Ingest", "OutputFlows": { "default": "raw" } },
{ "ProcessorType": "MapTransform", "InputFlows": { "default": "raw" }, "OutputFlows": { "default": "mapped" } },
{ "ProcessorType": "UpsertTransaction", "InputFlows": { "default": "mapped" } }In the example above:
Ingestwrites to a flow calledraw.MapTransformreads fromrawand writes to a flow calledmapped.UpsertTransactionsreads frommapped.
Some processors produce more than one output flow. InstrumentResolver, for example, writes records to a resolved or unresolved flow. You could then route unresolved records to an UpsertInstrument processor while sending resolved records straight to UpsertTransaction.
Output modes
Every processor has an OutputMode that controls what happens to records as they leave that processor:
Stream: The output flow passes records to the next processor without writing anything to storage. You should use this output mode for processors where you only need the records to flow forwards (typically in the middle of a pipeline).Persist: The output flow writes records to LUSID’s output-record store and stops the records there - no records flow forwards. You should use this output mode at the end of a pipeline when you need a lasting record of what was processed, and have no further processors to route records to.PersistAndContinue: Both writes records to storage and streams records to the next processor. You should use this output mode for mid-pipeline processors which need a record ID that is only available once the record has been stored the middle ), for example when a later processor needs to reference a previously-stored record.
Processor types
Ingestion
Every pipeline must begin with an Ingest processor. This processor:
Reads data from a source; available sources include:
A file in LUSID Drive
An SFTP server
An API
AWS S3 bucket
ActiveMQ
MongoDB
PostgreSQL
Snowflake
LUSID itself
Parses the data into records; available parsers include:
CSV
JSON
XML
Fixed-width
Excel (.xlsx and .xls)
FIX
SWIFT MT messages
Passes the records onto the next processor
Transformation
The MapTransform processor allows you to:
Rename fields
Apply string transformations
Tag records with a type
Split a single record into multiple records, for example splitting an FX forward into buy/sell legs
Record types
Record types have two uses:
Processors can filter by record type, for example to make a SinkFile processor only write
Cashrecord types to a file.Enhance monitoring via the LUSID dashboard; the integration uses record type tags to populate status logs, from
PayloadAcquiredtoResourceLoaded.You can set a record’s
RecordTypeinitially with theIngestprocessor, or update it usingMapTransform’srecordTypeMappingsfield.You can set a special record type
_Filtered = trueon a record to pass it through processors without loading it into LUSID. This allows you to discard unwanted records without losing visibility of them.
Resolution
InstrumentResolver: Looks up each record’s instrument identifier in LUSID and attaches the resolved LUID to the record.
PortfolioResolver: Looks up each record’s portfolio code in LUSID and attaches the resolved LUSID scope and code to the record.
Enrichment and quality
Enrichment: Adds extra fields to each record from a static mapping or API.
DQCheck: Validates records against configurable rules, for example checking a record has a required field or a value within an expected range.
Loading into LUSID
You can use the following processors to load records into LUSID:
UpsertTransactionUpsertHoldingsUpsertInstrumentUpsertLegalEntitiesUpsertCustomEntitiesUpsertRelationalDatasets
File operations
SinkFile: Converts records into one of the following file types and writes the file to a specified location:
CSV
JSON
XML
Fixed-width
Transfer: Moves files between data sources, for example moving a file from Drive to an SFTP server.
Utility
Merge: Combines multiple input flows into one.
Query: Executes a parameterised SQL query against Postgres or Snowflake.
Workflow: Creates tasks in the Workflow Service from records.
Notification: Sends an email at the end of a run.
Template tokens
You can specify template tokens that LUSID populates at runtime for many configurable fields. There are two types of token:
Template token example
Description
Use case
SECRET:my-keyPopulates a secret value from the Configuration Store.
Credentials, API keys, connection strings
DATE:T-1:yyyy-MM-ddPopulates a date value relative to the runtime.
File names and paths, effective dates
Note: The integration does not populate templates for SQL queries in the
Queryprocessor or Postgres/Snowflake data sources.