How custom dataflow integrations work

Prev Next

LUSID empowers you to build custom dataflow integrations - a flexible, configuration-driven way to load data from third-party sources into LUSID.

Rather than writing custom code for each integration, you describe what you want to happen in a JSON document. LUSID reads the JSON and runs the dataflow pipeline for you, handling:

  • Ingestion

  • Transformation

  • Instrument resolution

  • Data quality checks

  • Loading into LUSID

This article explains the key concepts you should understand before configuring or modifying a custom dataflow integration.

Integration instance

You create each custom dataflow integration as an integration instance. The instance controls when the integration you’ve configured runs, whether it’s currently active, and what happens after the run finishes.

Custom dataflow integration instances have four key parts:

  • Dataflow definition: A configuration file that defines what data to process and how

  • Enabled status: An on/off switch - when set to false, triggers do not kick off runs of the instance

  • Triggers: One or more schedules that automate runs of the integration instance

  • Post-process tasks: After the run completes, you can:

    • Chain integrations together

    • Send a notification

    • Kick off a LUSID workflow

Dataflow definition

Each custom dataflow integration instance has a dataflow definition - the JSON file that configures what the integration does.

A dataflow definition is made up of one or more tasks that execute in an ordered sequence. Every task contains a pipeline of processors that also run in order. In other words:

  1. Raw data enters at one end.

  2. The data passes through a series of processors.

  3. Processed data emerges, ready to be loaded into LUSID.

You can control how a dataflow behaves in the event of a failure using the following fields:

  • MaxErrors: The number of errors at record-level to tolerate before stopping a run.

  • MaxTaskRetries: How many times to retry a failing task.

Processors

A processor is a single step in a task’s pipeline, for example ingesting or transforming some data.

Processors read from and write to flows that pass records from one processor to the next using matching names. For example:

{ "ProcessorType": "Ingest",            "OutputFlows": { "default": "raw" } },
{ "ProcessorType": "MapTransform",      "InputFlows":  { "default": "raw" }, "OutputFlows": { "default": "mapped" } },
{ "ProcessorType": "UpsertTransaction", "InputFlows":  { "default": "mapped" } }

In the example above:

  1. Ingest writes to a flow called raw.

  2. MapTransform reads from raw and writes to a flow called mapped.

  3. UpsertTransactions reads from mapped.

Some processors produce more than one output flow. InstrumentResolver, for example, writes records to a resolved or unresolved flow. You could then route unresolved records to an UpsertInstrument processor while sending resolved records straight to UpsertTransaction.

Output modes

Every processor has an OutputMode that controls what happens to records as they leave that processor:

  • Stream: The output flow passes records to the next processor without writing anything to storage. You should use this output mode for processors where you only need the records to flow forwards (typically in the middle of a pipeline).

  • Persist: The output flow writes records to LUSID’s output-record store and stops the records there - no records flow forwards. You should use this output mode at the end of a pipeline when you need a lasting record of what was processed, and have no further processors to route records to.

  • PersistAndContinue: Both writes records to storage and streams records to the next processor. You should use this output mode for mid-pipeline processors which need a record ID that is only available once the record has been stored the middle ), for example when a later processor needs to reference a previously-stored record.

Processor types

Ingestion

Every pipeline must begin with an Ingest processor. This processor:

  1. Reads data from a source; available sources include:

    • A file in LUSID Drive

    • An SFTP server

    • An API

    • AWS S3 bucket

    • ActiveMQ

    • MongoDB

    • PostgreSQL

    • Snowflake

    • LUSID itself

  2. Parses the data into records; available parsers include:

    • CSV

    • JSON

    • XML

    • Fixed-width

    • Excel (.xlsx and .xls)

    • FIX

    • SWIFT MT messages

  3. Passes the records onto the next processor

Transformation

The MapTransform processor allows you to:

  • Rename fields

  • Apply string transformations

  • Tag records with a type

  • Split a single record into multiple records, for example splitting an FX forward into buy/sell legs

Record types

Record types have two uses:

  • Processors can filter by record type, for example to make a SinkFile processor only write Cash record types to a file.

  • Enhance monitoring via the LUSID dashboard; the integration uses record type tags to populate status logs, from PayloadAcquired to ResourceLoaded.

You can set a record’s RecordType initially with the Ingest processor, or update it using MapTransform’s recordTypeMappings field.

You can set a special record type _Filtered = true on a record to pass it through processors without loading it into LUSID. This allows you to discard unwanted records without losing visibility of them.

Resolution

InstrumentResolver: Looks up each record’s instrument identifier in LUSID and attaches the resolved LUID to the record.

PortfolioResolver: Looks up each record’s portfolio code in LUSID and attaches the resolved LUSID scope and code to the record.

Enrichment and quality

Enrichment: Adds extra fields to each record from a static mapping or API.

DQCheck: Validates records against configurable rules, for example checking a record has a required field or a value within an expected range.

Loading into LUSID

You can use the following processors to load records into LUSID:

  • UpsertTransaction

  • UpsertHoldings

  • UpsertInstrument

  • UpsertLegalEntities

  • UpsertCustomEntities

  • UpsertRelationalDatasets

File operations

SinkFile: Converts records into one of the following file types and writes the file to a specified location:

  • CSV

  • JSON

  • XML

  • Fixed-width

Transfer: Moves files between data sources, for example moving a file from Drive to an SFTP server.

Utility

Merge: Combines multiple input flows into one.

Query: Executes a parameterised SQL query against Postgres or Snowflake.

Workflow: Creates tasks in the Workflow Service from records.

Notification: Sends an email at the end of a run.

Template tokens

You can specify template tokens that LUSID populates at runtime for many configurable fields. There are two types of token:

Template token example

Description

Use case

SECRET:my-key

Populates a secret value from the Configuration Store.

Credentials, API keys, connection strings

DATE:T-1:yyyy-MM-dd

Populates a date value relative to the runtime.

File names and paths, effective dates

Note: The integration does not populate templates for SQL queries in the Query processor or Postgres/Snowflake data sources.