Monitoring custom integrations by logging to the Integrations dashboard

Prev Next

FINBOURNE offers an External client application integration type, enabling you to externally host and manage applications that can interact with the LUSID Integrations logging dashboard.

This gives you the flexibility to develop integrations tailored to your preferred technology stack, while maintaining logging visibility through LUSID’s infrastructure. You can build bespoke LUSID integrations for the applications that matter to you by having your external application call the API and push your logs to LUSID.

Note that this integration type is a tool for storing and viewing logs for your custom processes - it does not upsert any LUSID investment data.

Prerequisites

Before getting started, ensure you have:

  • An active contract with the application you wish to build an integration for (if necessary)

  • Contacted your FINBOURNE representative to enable the External client application integration type in your LUSID domain

Setting up and running the integration

Each custom integration must have a unique integration instance ID. You’ll pass the integration instance ID into each run of your integration.

Step 1: Create an integration instance

  1. Navigate to Integrations > Dashboard

  2. Select Create instance.

  3. Specify the following:

    • Name: A friendly name for this integration instance

    • Description: A detailed description of the instance

  4. Click Save to create the integration instance. Note the Instance ID, which you’ll need to run the integration.

Call the CreateInstance API, passing in:

  • integrationType: external-client-application

  • name: A friendly name for your custom integration

  • description: A friendly description for your custom integration

  • enabled: true

For example:

curl -X POST "https://<your-domain>.lusid.com/horizon/api/integrations/instances"
  -H "Authorization: Bearer <your-API-access-token>"
  -H "Content-Type: application/json"
  -d "{
    "integrationType": "external-client-application",
    "name": "Custom Integration 1",
    "description": "Logs for Custom Integration 1",
    "enabled": true,
    "triggers": [
      {
        "type": "",
        "cronExpression": "",
        "timeZone": ""
      }
   ],
    "details": {}
  }"

An example response is shown below. Note the integration instance id, which you’ll need to run the integration.

{
  "id": "04057004-a881-44bc-8af9-f95005f511bf"
}

Step 2: Kick off a run of the integration instance

To insert information to the integration logs, you must first kick off a run of the integration instance and retrieve the run ID.

  1. Navigate to Integrations > Dashboard.

  2. Click Menu > Run on your integration instance.

  3. Navigate to Integrations > Runs.

  4. Locate the run and note the unique Run ID.

Call the ExecuteInstance API, passing in the instance id from step 1. For example:

curl -X POST "https://<your-domain>.lusid.com/horizon/api/integrations/instances/04057004-a881-44bc-8af9-f95005f511bf/execute"
  -H "Authorization: Bearer <your-API-access-token>"
  -H "Content-Type: application/json"

An example response is shown below. Note the unique executionId, which you’ll need to create logs in LUSID.

{
  "executionId": "7a76a76e-55a3-4655-8727-474c340dea06"
}

Step 3: Configure your integration to upsert logging information via InsertExternalLogs API

The final stage of the setup is highly customisable. It’s up to you to map your data to the appropriate log status.

To get the most out of setting up logging in this way, it’s useful to understand what each status log indicates (though it remains customisable for you and your requirements). Read about what each log status means.

To insert logs for your integration to LUSID, you must configure your application to call the InsertExternalLogs API, passing in the integration instanceId (step 1) and runId (step 2) as parameters, and the following in the request body:

  • An identifying logId

  • A logLevel specifying a log type of  Payload or Resource

  • A logStatus, indicating the current state of the integration, for example PayloadExtracted, ResourceLoaded, or ResourceFailed

  • A timestamp for the log

  • Optionally, any of the following supplementary fields:

    • An identifying parentLogId to group one or more logIds together

    • sourceRecordType

    • sourcePrimaryIdType

    • sourcePrimaryIdValue

    • targetRecordType

    • targetPrimaryIdType

    • targetPrimaryIdValue

    • targetRecordAction

    • message

    • messageType

For example, to insert logs for the integration instance run from steps 1 and 2:

curl -X POST "https://<your-domain>.lusid.com/horizon/api/logs/04057004-a881-44bc-8af9-f95005f511bf/7a76a76e-55a3-4655-8727-474c340dea06"
  -H "Authorization: Bearer <your-API-access-token>"
  -H "Content-Type: application/json"
  -d "{
  "logs": [
    {
      "logid": 1,
      "parentlogid": 1,
      "loglevel": "Payload",
      "logstatus": "PayloadAcquired",
      "timestamp": "2025-01-22 23:12:13.1234"
    },
    {
      "logid": 2,
      "parentlogid": 1,
      "loglevel": "Payload",
      "logstatus": "PayloadExtracted",
      "timestamp": "2025-01-22 23:12:13.1234"
    },
    {
      "logid": 3,
      "parentlogid": 2,
      "loglevel": "Resource",
      "logstatus": "ResourceLoaded",
      "sourceprimaryidtype": "Ticker",
      "sourceprimaryidvalue": "MSFT",
      "targetprimaryidvalue": "MSFT",
      "timestamp": "2025-01-22 23:12:13.1234"
    },
    {
      "logid": 4,
      "parentlogid": 2,
      "loglevel": "Resource",
      "logstatus": "ResourceLoaded",
      "sourceprimaryidtype": "Ticker",
      "sourceprimaryidvalue": "AAPL",
      "targetprimaryidvalue": "AAPL",
      "timestamp": "2025-01-22 23:12:13.1234"
    },
  ]
}"

After successfully inserting the logs to LUSID, you can view them in the LUSID web app via Integrations > Runs > {Run ID}.

Step 3: Update the integration instance run status

To update the run status to Completed, you must end the integration run by calling the StopInstanceExecution API, passing in the integration instanceId and runId:

curl -X POST "https://<your-domain>.lusid.com/horizon/api/runs/04057004-a881-44bc-8af9-f95005f511bf/7a76a76e-55a3-4655-8727-474c340dea06/stop"
  -H "Authorization: Bearer <your-API-access-token>"
  -H "Content-Type: application/json"

Monitoring and troubleshooting integration runs

See how to monitor integrations and troubleshoot general issues.

Example: Using the Horizon Python SDK to create a log per instrument upserted to LUSID

The following example adds friendly logging to the instrument upsert operation introduced in this Jupyter notebook. The example uses the Horizon and LUSID Python SDKs to:

  1. Perform a one-time creation of an integration instance.

  2. Upsert the instruments to LUSID, alongside a log for each instrument.

  3. Except any errors.

Click to expand Python script

# Set up LUSID and Horizon API
import pandas as pd
import logging
logging.basicConfig(level = logging.INFO)

import lusid as lu
import lusid.api as la
import lusid.models as lm
from lusidjam import RefreshingToken
import lusid.extensions as le
from finbourne_sdk_utils.lpt.lpt import to_date

lusid_config_loaders=[
    le.ArgsConfigurationLoader(api_url="https://<your-domain>.lusid.com/api" ,access_token = RefreshingToken(), app_name = "LusidJupyterNotebook"),
    le.EnvironmentVariablesConfigurationLoader()]
lusid_api_factory = le.SyncApiClientFactory(config_loaders=lusid_config_loaders)

# Set pandas display options
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.options.display.float_format = "{:,.2f}".format

import finbourne_horizon as horizon
from finbourne_horizon.models.external_log_insertion_request import ExternalLogInsertionRequest
from finbourne_horizon.models.external_log_record import ExternalLogRecord
from finbourne_horizon.exceptions import ApiException
from finbourne_horizon.extensions.configuration_options import ConfigurationOptions
from finbourne_horizon.models import *
from pprint import pprint

horizon_config_loaders=[
    horizon.ArgsConfigurationLoader(api_url = "https://<your-domain>.lusid.com/horizon", access_token = RefreshingToken(), app_name = "LusidJupyterNotebook"),
    horizon.EnvironmentVariablesConfigurationLoader()
    ]
horizon_api_factory = horizon.SyncApiClientFactory(config_loaders=horizon_config_loaders)

integrations_api = horizon_api_factory.build(horizon.IntegrationsApi)
logs_api = horizon_api_factory.build(horizon.LogsApi)
runs_api = horizon_api_factory.build(horizon.RunsApi)

# Obtain the Instruments API
instruments_api=lusid_api_factory.build(la.InstrumentsApi)

# Read assets into pandas dataframe from securities file and show it
assets_df = pd.read_csv("data/assets.csv", keep_default_na = False)
display(assets_df)

One-time setup of integration instance:

# ONE-TIME SETUP: Create integration instance (run this cell only once, then comment it out)
triggers = [Trigger(type = "", cronExpression="", time_zone="")]

try:
     create_instance_request = CreateInstanceRequest(
         name="Instrument Upsert Integration",
         description="Logs for instrument upsert operations from Jupyter notebook",
         integration_type="external-client-application",
         triggers = triggers,
         details = "",
         enabled=True
     )
     
     instance_response = integrations_api.create_instance(create_instance_request=create_instance_request)
     instance_id = instance_response.id
     print(f"Created integration instance with ID: {instance_id}")
     print("IMPORTANT: Save this instance_id and use it in the next cell")
except horizon.ApiException as e:
     print(f"Error creating instance: {e}")

Kick off a run of the instance and log or except the results:

# After running the above once, paste your instance_id here:
instance_id = "b13f175e-c50a-443d-ba0d-de26b5ad8f0b"  # Replace with your actual instance ID

print(f"Using instance ID: {instance_id}")

# Create a dictionary of instrument definitions from asset data
definitions = {}

# Iterate over each row in the assets dataframe
for index, asset in assets_df.iterrows():
    
    # Map identifier columns to case-sensitive LUSID identifier names       
    identifiers = {
        # Unique identifiers
        "Figi": lm.InstrumentIdValue(value = asset["figi"]),
        "ClientInternal": lm.InstrumentIdValue(value = asset["internal_id"]),
        # Non-unique identifiers
        "Isin": lm.InstrumentIdValue(value = asset["isin"]),
        "Ticker": lm.InstrumentIdValue(value = asset["ticker"])  
    }
                                       
    # Model equities
    if asset["security_type"] == "equity":
        # Create definitions
        definitions[asset["instrument_name"]] = lm.InstrumentDefinition(
            name = asset["instrument_name"],
            identifiers = identifiers,
            definition = lm.Equity(
                instrument_type = "Equity",
                dom_ccy = asset["currency"],
                identifiers = {}
            )
        )
    # Model bonds
    elif asset["security_type"] == "govt bond":
        definitions[asset["instrument_name"]] = lm.InstrumentDefinition(
            name = asset["instrument_name"],
            identifiers = identifiers,
            definition = lm.Bond(
                instrument_type = "Bond",
                start_date = "2021-01-01T00:00:00Z",
                maturity_date = asset["maturity_date"],
                dom_ccy = asset["currency"],
                flow_conventions = lm.FlowConventions(
                    currency = asset["currency"],
                    payment_frequency = "6M",
                    day_count_convention = "ActualActual",
                    roll_convention = "NoAdjustment",
                    payment_calendars = [],
                    reset_calendars = [],
                    settle_days = 0,
                    reset_days = 0
                ),
                principal = 1,
                coupon_rate = float(asset["coupon"])
            )
        )

# Start an integration run to get a run ID
try:
    run_response = integrations_api.execute_instance(instance_id=instance_id)
    run_id = run_response.execution_id
    print(f"Started integration run with ID: {run_id}")
except horizon.ApiException as e:
    print(f"Error starting integration run: {e}")
    raise

try:
    # Upsert instruments to a custom scope in LUSID
    upsert_instruments_response = instruments_api.upsert_instruments(
        request_body = definitions, 
        scope = "external-client-application",
    )
    
    # Transform API response to a dataframe
    upsert_instruments_response_df = lusid_response_to_data_frame(
        list(upsert_instruments_response.values.values())
    )
    
    # Display the results
    display(upsert_instruments_response_df[["name", "lusidInstrumentId"]])
    
    # Log successful upserts - one log entry per instrument
    logs_to_insert = []
    
    for index, row in upsert_instruments_response_df.iterrows():
        message = f"Successfully upserted instrument: {row['name']}"
        log_record = ExternalLogRecord(
            logid=index + 1,
            loglevel="Resource",
            logstatus="ResourceLoaded",
            sourcerecordtype="Instrument",
            sourceprimaryidtype="InstrumentName",
            sourceprimaryidvalue=row["name"],
            targetprimaryidtype="LusidInstrumentId",
            targetprimaryidvalue=row["lusidInstrumentId"],
            targetrecordtype="LUSID",
            targetrecordaction="Upsert",
            message=message[:1024],
            timestamp="2025-01-22 23:12:13.1234"
        )
        logs_to_insert.append(log_record)
    
    # Insert logs to Horizon
    external_log_insertion_request = ExternalLogInsertionRequest(logs=logs_to_insert)
    
    logs_response = logs_api.insert_external_logs(
        instanceid=instance_id,
        runid=run_id,
        external_log_insertion_request=external_log_insertion_request
    )
    
    print(f"Successfully logged {len(logs_to_insert)} instrument upserts to Horizon")
    
    # Complete the integration run successfully
    try:
        runs_api.stop_instance_execution(instance_id=instance_id, run_id=run_id)
        print(f"Stopped integration run {run_id}")
    except horizon.ApiException as e:
        print(f"Warning: Could not stop integration run: {e}")
    
except lu.ApiException as e:
    # Log the failure
    print(f"Failed to upsert instruments: {e}")
    
    error_message = f"Failed to upsert instruments to LUSID: {str(e)}"
    error_log = ExternalLogRecord(
        logid=1,
        loglevel="Resource",
        logstatus="ResourceFailed",
        sourcerecordtype="InstrumentUpsert",
        message=error_message[:1024],
        timestamp="2025-01-22 23:12:13.1234"
    )
    
    external_log_insertion_request = ExternalLogInsertionRequest(logs=[error_log])
    
    try:
        logs_response = logs_api.insert_external_logs(
            instanceid=instance_id,
            runid=run_id,
            external_log_insertion_request=external_log_insertion_request
        )
        print("Failure logged to Horizon")
        
        # Stop the integration run
        runs_api.stop_instance_execution(instance_id=instance_id, run_id=run_id)
        print(f"Stopped integration run {run_id}")
        
    except horizon.ApiException as log_error:
        print(f"Failed to log error to Horizon: {log_error}")
    
    # Re-raise the original exception
    raise

except Exception as e:
    # Catch any other unexpected errors
    print(f"Unexpected error: {e}")
    
    error_message = f"Unexpected error during instrument upsert: {str(e)}"
    error_log = ExternalLogRecord(
        logid=1,
        loglevel="Payload",
        logstatus="PayloadFailed",
        sourcerecordtype="InstrumentUpsert",
        message=error_message[:1024],
        timestamp="2025-01-22 23:12:13.1234"
    )
    
    external_log_insertion_request = ExternalLogInsertionRequest(logs=[error_log])
    
    try:
        logs_response = logs_api.insert_external_logs(
            instanceid=instance_id,
            runid=run_id,
            external_log_insertion_request=external_log_insertion_request
        )
        print("Failure logged to Horizon")
        
        # Stop the integration run
        runs_api.stop_instance_execution(instance_id=instance_id, run_id=run_id)
        print(f"Stopped integration run {run_id}")
        
    except horizon.ApiException as log_error:
        print(f"Failed to log error to Horizon: {log_error}")
    
    raise

Once we run the script, we can check the logs by navigating to Integrations > Runs > {Run ID}: