NAV
python

Welcome to integrate.ai

Activate your data silos

We help organizations integrate AI to solve the world’s most important problems without risking the world’s most sensitive data. Safely collaborate on siloed data across organizations, jurisdictions, and environments to unlock mission critical analytics and AI use cases.

Unlock quality data at scale

Collaborate within and across organizations, jurisdictions, and environments without moving data

Reduce collaboration effort

Efficiently collaborate without copying, centralizing, or transferring data directly

Protect privacy & trust

Avoid compliance hurdles through a trusted platform with baked-in security, safety, and privacy

Developer tools for federated learning and analytics

Machine learning and analytics on distributed data require complex infrastructure and tooling that drain valuable developer resources. Integrate.ai abstracts away the complexity, making it easier than ever to integrate federated learning and analytics into your product.

Learn more about our flexible architecture options and expansive data science tooling.

System Overview

integrate.ai provides a robust and seamless system that removes the infrastructure and administration challenges of distributed machine learning and enables you to integrate it directly into your existing workflow. With robust privacy-enhancing technologies like federated learning and differential privacy, integrate.ai allows you to train models and run analytics across distributed data without compromising data privacy.

Our platform consists of the following components:

  1. The core IAI SaaS backend - includes federated learning aggregation capabilities, tenant management, control-plane system orchestration, and observability.
  2. A customer-facing web portal that supports general administration of the customer tenant.
  3. An SDK with a REST API that provides an interface into the centralized server. It also supports notebook-based ad hoc discovery and exploratory analysis.

Platform Architecture Overview

GETTING STARTED

Workspace Configuration

integrate.ai offers a cloud-hosted, fully managed system for users focused on model building, training, and analysis. In this deployment scenario, the integrate.ai system manages all components hosted in the customer's environment using a limited permission role granted by the customer.

The initial setup for your workspace should be performed by an AWS or Microsoft Azure Administrator. This Administrator must have rights neceesary to configure the permissions and access needed by the cloud-based task runners. Follow the steps in the Using integrate.ai section for either AWS Environments or Azure Environments to generate the roles and policies or service principal needed to create a task runner.

Install the SDK locally to run tasks through notebooks using task runners.

Next, create a task runner through the workspace UI, then register a dataset, and start training sessions without having to configure additional infrastructure.

Administrator Workflow:

  1. Configure your cloud environment.
  2. Provide the required info to the integrate.ai Workspace Administrator.

Workspace Administrator Workflow:

  1. Create an AWS task runner or create an Azure task runner for the data scientists to use to perform model training.

Data Scientist Workflow:

  1. Install the integrate.ai SDK on your local machine.
  2. Download any sample notebooks and data you want to test.
  3. For EDA, register a dataset with a task runner and start training.

For Developers

For those who are interested in integrating the system with their own product, or writing software that makes use of integrate.ai capabilities, there are several options, including a RESTful API and full-featured SDK.

For instructions on installing the SDK and necessary components for local development, see Installing the SDK. Refer to the API Documentation for details of the functions and operations.

Data Requirements

To run a training session, the data must be prepared according to the following standards:

integrate.ai supports tabular data for standard models. You can create custom dataloaders for custom models, which allow for image and video data inputted as a folder instead a flat file. For more information about dataloaders, see Building a custom model.

Custom models can also be used for feature engineering or preprocessing.

GLM and FFNet Model Data Requirements

integrate.ai supports horizontal federated learning (HFL) and vertical federated learning (VFL) to train models across different siloed datasets (or data silos) without transferring data between each silo.

To train an HFL model, datasets within each data silo that will participate in an HFL training session need to be prepared according to the following requirements:

  1. Data should be in Apache Parquet format (recommended) or .csv, and can be hosted locally, in an S3 bucket, or in an Azure blob.
  2. For custom models, the data can be in either a single file OR folder format.
  3. Data must be fully feature engineered. Specifically, the data frame has to be ready to be used as an input for a neural network. This means that: a. All columns must be numerical b. Columns must not contain NULL values (missing values are imputed) c. Categorical variables are properly encoded (for example, by one-hot-encoding) d. Continuous variables are normalized to have mean = 0 and std = 1
  4. Feature engineering must be consistent across the silos. For example, if the datasets contain categorical values, such as postal codes, these values must be encoded the same way across all the datasets. For the postal code example, this means that the same postal code value translates to the same numerical values across all datasets that will participate in the training.
  5. Column names must be consistent across datasets. All column names (predictors and targets) must contain only alphanumeric characters (letters, numbers, dash -, underscore _) and start with a letter. You can select which columns you want to use in a specific training session.

USING INTEGRATE.AI

integrate.ai provides a safe data collaboration platform that enables data science leaders to activate data silos by moving models, not data. It democratizes usage of the cloud by taking care of all the computational aspects of deployment — from container orchestration and deployment, to security and failovers. By harnessing the power of cloud computing and AI we enable data scientists and R&D professionals, with limited-to-no computational and machine learning training, to analyse and make sense of their data in the fastest and safest way possible.

The image below illustrates the high-level system architecture and how the managed environments interact.

The task runners use the serverless capabilities of cloud environments (such as AWS Batch and Fargate). This greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.

AWS Environments

AWS Task Runner Configuration

Before you get started using integrate.ai in your cloud for training sessions, there are a few configuration steps that must be completed. You must grant integrate.ai permission to deploy task runner infrastructure in your cloud, by creating a limited permission Role in AWS for the provisioner and for the runtime agent. This is a one-time process - once created, you can use these roles for any task runners in your environment.

This section walks you through the required configuration.

  1. Create a provisioner role and policy.
  2. Create a runtime role and policy.
  3. Create a task runner.
  4. Use the task runner in a notebook to perform training tasks.

Create AWS IAM Provisioner Role and Policy

# Provisioner Custom trust policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::919740802015:role/iai-taskrunner-provision-<workspace-name>-prod-batch-ecs-task-role"
                ]
            },
            "Action": [
                "sts:AssumeRole",
                "sts:TagSession"
            ],
            "Condition": {}
        }
    ]
}

You must grant integrate.ai access to deploy task runner infrastructure in your cloud, by creating a limited permission Role in AWS for the provisioner.

Provisioner Role and policy

This policy lists all of the required permissions for integrate.ai to create the necessary infrastructure. Once your task runner is online, you can disable this role for extra security. However, disabling this role prevents you from creating new task runners and deleting existing ones.

The provisioner creates the following components and performs the required related tasks:

To create the provisioner role and policy:

  1. In the AWS Console, go to IAM, select Policies, and click Create policy.
  2. On the Specify permissions page, click the JSON tab.
  3. Paste in the Runtime JSON policy provided below by integrate.ai. Do not edit this policy.

    Provisioner JSON policy - do not edit. Click to expand.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "IAMPermissions", "Effect": "Allow", "Action": [ "iam:CreateInstanceProfile", "iam:CreateServiceLinkedRole", "iam:GetInstanceProfile", "iam:RemoveRoleFromInstanceProfile", "iam:DeleteInstanceProfile", "iam:AddRoleToInstanceProfile", "iam:CreatePolicy", "iam:CreateRole", "iam:GetPolicy", "iam:GetRole", "iam:GetPolicyVersion", "iam:ListRolePolicies", "iam:ListAttachedRolePolicies", "iam:ListPolicyVersions", "iam:ListInstanceProfilesForRole", "iam:DeletePolicy", "iam:DeleteRole", "iam:AttachRolePolicy", "iam:PassRole", "iam:DetachRolePolicy" ], "Resource": "*" }, { "Sid": "BatchPermissions", "Effect": "Allow", "Action": [ "batch:RegisterJobDefinition", "batch:DeregisterJobDefinition", "batch:CreateComputeEnvironment", "batch:UpdateComputeEnvironment", "batch:DeleteComputeEnvironment", "batch:UpdateJobQueue", "batch:CreateJobQueue", "batch:DeleteJobQueue", "batch:DescribeComputeEnvironments", "batch:DescribeJobDefinitions", "batch:DescribeJobQueues" ], "Resource": "*" }, { "Sid": "CloudWatchPermissions", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups", "logs:ListTagsLogGroup", "logs:CreateLogGroup", "logs:DescribeLogGroups", "logs:DeleteLogGroup", "logs:TagResource" ], "Resource": "*" }, { "Sid": "ECSFargatePermissions", "Effect": "Allow", "Action": [ "ecs:CreateCluster", "ecs:DescribeClusters", "ecs:DeleteCluster", "ecs:RegisterTaskDefinition", "ecs:DescribeTaskDefinition", "ecs:DeregisterTaskDefinition" ], "Resource": "*" }, { "Sid": "KmsPermissions", "Effect": "Allow", "Action": [ "kms:ListAliases", "kms:CreateKey", "kms:CreateAlias", "kms:DescribeKey", "kms:GetKeyPolicy", "kms:GetKeyRotationStatus", "kms:ListResourceTags", "kms:ScheduleKeyDeletion", "kms:CreateGrant", "kms:ListGrants", "kms:RevokeGrant", "kms:DeleteAlias" ], "Resource": "*" }, { "Sid": "S3CreatePermissions", "Effect": "Allow", "Action": [ "s3:CreateBucket", "s3:DeleteBucket", "s3:DeleteBucketPolicy", "s3:PutBucketVersioning", "s3:PutBucketPublicAccessBlock", "s3:PutBucketVersioning", "s3:PutEncryptionConfiguration" ], "Resource": "*" }, { "Sid": "S3ReadPermissions", "Effect": "Allow", "Action": [ "s3:GetBucketCors", "s3:GetBucketPolicy", "s3:PutBucketPolicy", "s3:GetBucketWebsite", "s3:GetBucketVersioning", "s3:GetLifecycleConfiguration", "s3:GetAccelerateConfiguration", "s3:GetBucketRequestPayment", "s3:GetBucketLogging", "s3:GetBucketPublicAccessBlock", "s3:GetBucketAcl", "s3:GetBucketObjectLockConfiguration", "s3:GetReplicationConfiguration", "s3:GetBucketTagging", "s3:GetEncryptionConfiguration", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::*integrate.ai", "arn:aws:s3:::*integrate.ai/*" ] }, { "Sid": "VPCCreatePermissions", "Effect": "Allow", "Action": [ "ec2:CreateVpc", "ec2:CreateTags", "ec2:AllocateAddress", "ec2:ReleaseAddress", "ec2:CreateSubnet", "ec2:ModifySubnetAttribute", "ec2:RevokeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupEgress", "ec2:CreateRouteTable", "ec2:CreateRoute", "ec2:CreateInternetGateway", "ec2:AttachInternetGateway", "ec2:AssociateRouteTable", "ec2:ModifyVpcAttribute", "ec2:CreateSecurityGroup", "ec2:CreateNatGateway" ], "Resource": "*" }, { "Sid": "VPCDescribePermissions", "Effect": "Allow", "Action": [ "ec2:DescribeVpcs", "ec2:DescribeSubnets", "ec2:DescribeSubnets", "ec2:DescribeVpcAttribute", "ec2:DescribeVpcClassicLinkDnsSupport", "ec2:DescribeVpcClassicLink", "ec2:DescribeInternetGateways", "ec2:DescribeSecurityGroups", "ec2:DescribeSecurityGroupRules", "ec2:DescribeRouteTables", "ec2:DescribeNetworkAcls", "ec2:DescribeNetworkInterfaces", "ec2:DescribeNatGateways", "ec2:DescribeAddresses" ], "Resource": "*" }, { "Sid": "VPCDeletePermissions", "Effect": "Allow", "Action": [ "ec2:DeleteSubnet", "ec2:DisassociateRouteTable", "ec2:DeleteSecurityGroup", "ec2:DeleteRoute", "ec2:DeleteNatGateway", "ec2:DeleteRouteTable", "ec2:DisassociateAddress", "ec2:DetachInternetGateway", "ec2:DeleteInternetGateway", "ec2:DeleteVpc" ], "Resource": "*" } ] }

  4. Click Next.

  5. Name the policy and click Create policy.

  6. In the left navigation bar, select Roles, and click Create role.

  7. Select Custom trust policy.

  8. Paste in the custom trust relationship JSON provided by integrate.ai (shown in the code panel on the right).

  9. Replace the workspace-name with the value for your workspace provided by integrate.ai.

  10. Click Next.

  11. On the Add permissions page, search for and select the policy you just created.

  12. Click Next.

  13. Provide the following Role name: iai-taskrunner-provisioner. Important: Do not edit or change this name.

  14. Click Create role.

Copy and save the ARN for the provisioner role. It is required for creating a task runner.

Create AWS IAM Runtime Role and Policy

You must grant integrate.ai access to run the task runner in your cloud environment by creating a limited permission Role in AWS for the runtime agent.

Runtime Role and policy:

# Runtime Custom trust policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::919740802015:role/IAI-API-Instance-<workspace-name>-prod"
                ]
            },
            "Action": [
                "sts:AssumeRole",
                "sts:TagSession"
            ],
            "Condition": {}
        }
    ]
}

This policy requires fewer permissions, compared to the Provisioner role. For increased security, the runtime role has much less access. It is used to run training tasks.

To create the runtime policy and role:

  1. In the AWS Console, go to IAM, select Policies, and click Create policy.
  2. On the Specify permissions page, click the JSON tab.
  3. Paste in the Runtime JSON policy provided below by integrate.ai.

Runtime JSON policy - do not edit. Click to expand.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowBatchDescribeJobs", "Effect": "Allow", "Action": [ "batch:DescribeJobs", "batch:TagResource" ], "Resource": "" }, { "Sid": "AllowBatchAccess", "Effect": "Allow", "Action": [ "batch:TerminateJob", "batch:SubmitJob", "batch:TagResource", "batch:CancelJob" ], "Resource": [ "arn:aws:batch:::job-definition/iai-fl-client-batch-job-", "arn:aws:batch:::job-queue/iai-fl-client-batch-job-queue-", "arn:aws:batch:::job/" ] }, { "Sid": "AllowECSUpdateAccess", "Effect": "Allow", "Action": [ "ecs:DescribeContainerInstances", "ecs:DescribeTasks", "ecs:ListTasks", "ecs:UpdateContainerAgent", "ecs:StartTask", "ecs:StopTask", "ecs:RunTask" ], "Resource": [ "arn:aws:ecs:::cluster/iai-fl-server-ecs-cluster-", "arn:aws:ecs:::task/iai-fl-server-ecs-cluster-", "arn:aws:ecs:::task-definition/iai-fl-server-fargate-job-" ] }, { "Sid": "AllowECSReadAccess", "Effect": "Allow", "Action": [ "ecs:DescribeTaskDefinition" ], "Resource" : [""] }, { "Sid": "AllowPassRole", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam:::role/iai-fl-server-fargate-task-role--", "arn:aws:iam:::role/iai-fl-server-fargate-execution-role--" ] }, { "Sid": "AllowSaveTokens", "Effect": "Allow", "Action": [ "ssm:PutParameter", "ssm:DescribeParameters", "ssm:GetParameters", "ssm:GetParameter" ], "Resource": [ "arn:aws:ssm:::parameter/fl-server--token", "arn:aws:ssm:::parameter/fl-client--token" ] }, { "Sid": "AllowS3Access", "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetObject", "s3:GetObjectAcl", "s3:GetObjectVersion", "s3:ListBucketVersions" ], "Resource": [ "arn:aws:s3:::.integrate.ai", "arn:aws:s3:::.integrate.ai/" ] }, { "Sid": "AllowKMSUsage", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:Encrypt", "kms:GenerateDataKey" ], "Resource": "", "Condition": { "ForAnyValue:StringLike": { "kms:ResourceAliases": "alias/iai/" } } }, { "Sid": "AllowLogReading", "Effect": "Allow", "Action": [ "logs:Describe", "logs:List*", "logs:StartQuery", "logs:StopQuery", "logs:TestMetricFilter", "logs:FilterLogEvents", "logs:Get*" ], "Resource": [ "arn:aws:logs:::log-group:iai-fl-server-fargate-log-group-:log-stream:ecs/fl-server-fargate/", "arn:aws:logs:::log-group:/aws/batch/job:log-stream:iai-fl-client-batch-job-*" ] } ] }

  1. Click Next.
  2. Name the policy and click Create policy.
  3. In the left navigation bar, select Roles, and click Create role.
  4. Select Custom trust policy.
  5. Paste in the custom trust relationship JSON provided by integrate.ai (shown in the code panel on the right).
  6. Replace the workspace-name with the value for your workspace provided by integrate.ai.
  7. Click Next.
  8. On the Add permissions page, search for and select the policy you just created.
  9. Click Next.
  10. Provide the following Role name: iai-taskrunner-runtime Important: Do not edit or change this name.
  11. Click Create role.

Copy and save the ARN for the runtime role. It is required for creating a task runner.

Role and policy configuration is now complete.

Create an AWS Task Runner

Task runners simplify the process of running training sessions on your data.

Note: before attempting to create a task runner, ensure your Administrator has completed the AWS Task Runner Configuration.

To create an AWS task runner:

  1. Log in to your integrate.ai workspace.
  2. Click your name in the upper right corner to access the menu.
  3. Click Settings.
  4. In the left-hand navigation, under Workspace, click Task Runners.
  5. Click Connect new to start creating a task runner.
  6. Select the service provider - Amazon Web Services.
  7. Provide the following information:
  1. Click Save.
  2. Wait for the status to change to Online. Note that you may have to refresh the page to see the updated status.

Q: What if task runner creation fails?

A: Delete the task runner and try again. Make sure that you have specified the correct ARN for both the provisioner and runtime roles with the integrate.ai policy applied.

After successfully creating a task runner, you can use it to perform training tasks. You can reuse the task runner; there's no need to create a new one for each task.

Running a training task with an AWS Task Runner

The integrateai_taskrunner_AWS.ipynb notebook provides examples of how to use an AWS task runner for different federated learning tasks.

You must install the integrate.ai SDK locally in order to run the notebook examples. For instructions, see Installing the SDK. Note that you do not need to install the integrate.ai Client or Server, as the Task Runner manages the client and server operations for you in your cloud environment.

Download the notebooks here, and the sample data here.

Register a dataset

For Exploratory Data Analysis (EDA), register your dataset through the UI, by following the steps below.

  1. Log in to your integrate.ai workspace.
  2. Click Library in the left navigation bar.
  3. On the Datasets tab, click Register dataset.
  4. Select a task runner to manage tasks related to your dataset. Note: If no task runners exist, ask your Workspace Administrator to create one.
  5. Click Next.
  6. On the Dataset details and privacy controls page, type a name and description for the dataset.
  7. Specify the URI of the dataset, using the s3:// format.
  8. (Optional) If you have metadata to associate with the dataset, upload it in the Attachments section.
  9. To allow Custom Models to use your dataset, enable the Custom models toggle.
  10. Click Connect.

Your dataset is now registered and can be used in a notebook.

Specify a registered dataset with an AWS task runner

eda_task_group_context = (
        SessionTaskGroup(eda_session) \
        .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
        // registered dataset
        .add_task(iai_tb_aws.eda(dataset_name="silo0"))\  
        //non-registered dataset
        .add_task(iai_tb_aws.eda(dataset_name="dataset_two", dataset_path=passive_train_path))\
        .start()
    )

After you have registered your dataset with a task runner, you only need to specify the dataset name in the task to be done.

In this example code for creating a task group, the first task is running the server. The second task is starting a client that uses a registered dataset. The third task is starting a client with a non-registered dataset; in this case, you must specify both dataset name and path.

Microsoft Azure Environments

Azure Task Runner Configuration

Before you get started using integrate.ai in your Microsoft Azure cloud for training sessions, there are a few configuration steps that must be completed. You must grant integrate.ai permission to deploy task runner infrastructure in your cloud, by creating a dedicated resource group and a limited permission service principal for provisioning a task runner. The provisioner automatically creates a second service principal to execute Azure tasks using the task runner. This is a one-time process - once created, you can use this infrastructure for any task runners in your environment.

This section walks you through the required configuration.

  1. Create a Resource Group and provisioner service principal.
  2. Create a task runner.
  3. Use the task runner in a notebook to perform training tasks.

Create an Azure Resource Group & Service Principal

# Example Minimum Permission Policy Requirements for the Provisioner Service Principal
{
    "properties": {
        "roleName": "iai-provisioner",
        "description": "Provision new task runner",
        "assignableScopes": ["/"],
        "permissions": [
            {
                "actions": [
                   "*/read",
                    "Microsoft.Authorization/roleAssignments/write",
                    "Microsoft.Storage/storageAccounts/write",
                    "Microsoft.Storage/storageAccounts/listKeys/action",
                    "Microsoft.Storage/storageAccounts/delete"
                ],
                "notActions": [],
                "dataActions": [],
                "notDataActions": []
            }
        ]
    }
}

You must grant integrate.ai access to deploy task runner infrastructure in your cloud, by creating a dedicated resource group and service principal for the provisioner agent. A dedicated resource group is required for integrate.ai to provision resources into. You must provide the credentials for the service principal as part of the task runner creation process.

In order to provide all the necessary permissions, the user who creates the resource group and provisioner service principal must be an Azure AD Administrator.

Internally, integrate.ai will create a second service principal with reduced permissions to run tasks within the previously created resource group.

Permission Requirements for Task Runner Provisioner

  1. If not already available, install the Azure CLI. For more information, see the Azure CLI documentation.
  2. At the command prompt, type: az ad sp create-for-rbac --role="Owner" --scopes="/subscriptions/<your subscription ID>/resourcegroups/<your-resource-group>" Make sure that you specify the correct resource group name.
  3. Copy the output and save it. This information is required to connect a new task runner. Note: The output includes credentials that you must protect.
  4. From the Azure AD dashboard, add the Application Administrator permission to the service principal.
# Example CLI output:

Creating 'Owner' role assignment under scope '/subscriptions/<subscription ID>/resourcegroups/test-resource-group'
The output includes credentials that you must protect. Be sure that you do not include these credentials in your code or check the credentials into your source control. For more information, see https://aka.ms/azadsp-cli

{
  "appId": "<Client ID>",
  "displayName": "azure-cli-2023-04-13-14-57-09",
  "password": "<secret>",
  "tenant": "<tenant ID>"
}

The provisioner creates the following components and performs the required related tasks:

Create an Azure Task Runner

Task runners simplify the process of running training sessions on your data.

Note: before attempting to create a task runner, ensure you have completed the Azure Task Runner Configuration.

To create an Azure task runner:

  1. Log in to your integrate.ai workspace.
  2. Click your name in the upper right corner to access the menu.
  3. Click Settings.
  4. In the left-hand navigation, under Workspace, click Task Runners.
  5. Click Connect new to start creating a task runner.
  6. Select the service provider - Microsoft Azure.
  7. Provide the following information:

Click Save. Wait for the status to change to Online. Note that you may have to refresh the page to see the updated status.

After successfully creating a task runner, you can use it to perform training tasks. You can reuse the task runner; there's no need to create a new one for each task.

Running a training task with an Azure Task Runner

The integrateai_taskrunner_azure.ipynb notebook provides examples of how to use an Azure task runner for different federated learning tasks.

You must install the integrate.ai SDK locally in order to run the notebook examples. For instructions, see Installing the SDK. Note that you do not need to install the integrate.ai Client, as the Task Runner performs the Client operations for you.

Download the notebook here.

Register a dataset (Azure)

For Exploratory Data Analysis (EDA), register your dataset through the workspace UI, by following the steps below.

  1. Log in to your integrate.ai workspace.
  2. Click Library in the left navigation bar.
  3. On the Datasets tab, click Register dataset.
  4. Select a task runner to manage tasks related to your dataset. Note: If no task runners exist, ask your Workspace Administrator to create one.
  5. Click Next.
  6. On the Dataset details and privacy controls page, type a name and description for the dataset.
  7. Specify the URI of the dataset, using the azure:// format.
  8. (Optional) If you have metadata to associate with the dataset, upload it in the Attachments section.
  9. To allow Custom Models to use your dataset, enable the Custom models toggle.
  10. Click Connect.

Your dataset is now registered and can be used in a notebook.

Specify a registered dataset with an Azure task runner

eda_task_group_context = (
        SessionTaskGroup(eda_session) \
        .add_task(iai_tb_aws.fls(storage_path=az_storage_path))\
        //registered dataset
        .add_task(iai_tb_aws.eda(dataset_name="silo0_az"))\  
        //non-registered dataset
        .add_task(iai_tb_aws.eda(dataset_name="dataset_two", dataset_path=passive_train_path))\  
        .start()
    )

After you have registered your dataset with a task runner, you only need to specify the dataset name in the task to be done.

In this example code for creating a task group, the first task is running the server. The second task is starting a client that uses a registered dataset. The third task is starting a client with a non-registered dataset; in this case, you must specify both dataset name and path.

Installing the SDK

To run the integrate.ai SDK samples and build models, ensure that your environment is configured properly.

Required software:

To run the sample notebook, you may also need:

Install integrate.ai components

Generate an access token

To install the SDK and other components locally, you must generate an access token through the workspace UI.

  1. Log in to your workspace through the portal.
  2. On the Dashboard, click Generate Access Token.
  3. Copy the acccess token and save it to a secure location.

Treat your API tokens like passwords and keep them secret. When working with the API, use the token as an environment variable instead of hardcoding it into your programs. In this documentation, the token is referenced as <IAI_TOKEN>.

Install integrate.ai packages

pip install integrate-ai

iai sdk install --token <IAI_TOKEN>

Note: If you are using a Task Runner, you can skip the steps below, as do not need to install the client or server.

iai client pull --token <IAI_TOKEN>

iai server pull --token <IAI_TOKEN>

Optional: If you are building a model for data that includes images or video, follow the steps for Setting up a Docker GPU Environment. Otherwise, you can skip the GPU instructions.

Download the sample notebooks and datasets

You can download the sample Jupyter notebooks from integrate.ai's sample repository.

Download the sample dataset to use with the end user tutorials.

The sample includes four files:

For VFL models, there are datasets for active and passive party examples available here.

Updating your local environment

When a new version of an integrate.ai package is released, follow these steps to update your local development environment.

Note: If you are using task runners, you only need to update your SDK.

Check version numbers

From a command prompt or terminal, use the pip show integrate-ai command to see the version of the integrate-ai command line interface (CLI).

To check the SDK version, run iai sdk version.

You can compare the version against the feature release version in the Release Notes to ensure that your environment is up to date.

Get the latest packages

Run the following command to update the CLI and all dependent packages. pip install -U integate-ai

Update the SDK: iai sdk install

Update the client: iai client pull

Update the server: iai server pull

END USER TUTORIALS

Tutorial - FFNet Model Training with a Sample Local Dataset (iai_ffnet)

To help you get started with federated learning in the integrate.ai system, we've provided a tutorial based on synthetic data with pre-built configuration files that you can run on your local machine, or using a task runner. In this tutorial, you will train a federated feedforward neural network (iai_ffn) using data from two datasets. The datasets, model, and data configuration are provided for you.

The sample notebook (integrateai_api.ipynb) contains runnable code snippets for exploring the SDK and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.

Tip: if you are running the example locally, make sure you have completed the Environment Setup.

Open the integrateai_api.ipynb notebook to test the code as you walk through this exercise.

Understanding Models

model_config = {
    "experiment_name": "test_synthetic_tabular",
    "experiment_description": "test_synthetic_tabular",
    "strategy": {     
        "name": "FedAvg",       // Name of the federated learning strategy
        "params": {}
        },
    "model": {                  // Parameters specific to the model type 
        "params": {
            "input_size": 15, 
            "hidden_layer_sizes": [6, 6, 6], 
            "output_size": 2
                   }
            },
    "balance_train_datasets": False, // Performs undersampling on the dataset
    "ml_task": {                    // Specifies the federated learning strategy
        "type": "classification",
        "params": {
            "loss_weights": None,  
        },
    },
    "optimizer": {
        "name": "SGD",              // Name of the PyTorch optimizer used 
        "params": {
            "learning_rate": 0.2,
            "momentum": 0.0}
            },
    "differential_privacy_params": {    // Defines the differential privacy parameters
        "epsilon": 4, 
        "max_grad_norm": 7
        },
    "save_best_model": {
        "metric": "loss",           // To disable this and save the model from the last round, set to None
        "mode": "min",
    },
}

integrate.ai has several standard model classes available, including:

Model configuration

These standard models are defined using JSON configuration files during session creation. The model configuration (model_config) is a JSON object that contains the model parameters for the session.

There are five main properties with specific key-value pairs used to configure the model:

The example in the notebook is a model provided by integrate.ai. For this tutorial, you do not need to change any of the values.

Data configuration

data_config = {
    "predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
    "target": "y",
}

The data configuration is a JSON object where the user specifies predictor and target columns that are used to describe input data. This is the same structure for both GLM and FNN.

Once you have created or updated the model and data configurations, the next step is to create a training session to begin working with the model and datasets.

Authenticate with your Access Token

import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")

from integrate_ai_sdk.api import connect client = connect(token=IAI_TOKEN)

Use your access token to authenticate to the API client. The SDK simplifies the authentication process by providing a connect helper module. You import the connect helper from the SDK, provide your token, and authenticate to the client.

Set an environment variable for your token (as described in Environment Setup) or replace it inline in the sample code.

Reminder: You can generate and manage tokens through the integrate.ai web portal.

Create and start the training session

session = client.create_fl_session(
    name="Testing notebook",
    description="I am testing session creation through a notebook",
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_ffnet",
    model_config=model_config,
    data_config=data_config,
).start()

session.id

Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.

Create a new session each time you want to train a new model.

The code sample demonstrates creating and starting a session with two training datasets (specified as min_num_clients) and two rounds (num_rounds). It returns a session ID that you can use to track and reference your session.

The package_name specifies the federated learning model package - in the example, it is iai_ffnet however, other packages are supported. See Model packages for more information.

Join clients to the session

import subprocess
data_path = "~/Downloads/synthetic"
#Join dataset one (silo0)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1 --remove-after-complete",
    shell=True
)
#Join dataset two (silo1)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2 --remove-after-complete",
    shell=True
)

The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. Therefore, to run this example, you will call subprocess.Popen twice to connect each dataset to the session as a separate client.

The session begins training after the minimum number of clients have joined the session.

Each client runs as a separate Docker container to simulate distributed data silos.

Poll for session results

import time

current_round = None
current_status = None
while client_1.poll() is None or client_2.poll() is None:
    output1 = client_1.stdout.readline().decode("utf-8").strip()
    output2 = client_2.stdout.readline().decode("utf-8").strip()
    if output1:
        print("silo1: ", output1)
    if output2:
        print("silo2: ", output2)

    # poll for status and round
    if current_status != training_session.status:
        print("Session status: ", training_session.status)
        current_status = training_session.status
    if current_round != training_session.round and training_session.round > 0:
        print("Session round: ", training_session.round)
        current_round = training_session.round
    time.sleep(1)

output1, error1 = client_1.communicate()
output2, error2 = client_2.communicate()

print(
    "client_1 finished with return code: %d\noutput: %s\n  %s"
    % (client_1.returncode, output1.decode("utf-8"), error1.decode("utf-8"))
)
print(
    "client_2 finished with return code: %d\noutput: %s\n  %s"
    % (client_2.returncode, output2.decode("utf-8"), error2.decode("utf-8"))
)

Depending on the type of session and the size of the datasets, sessions may take some time to run. In the sample notebook and this tutorial, we poll the server to determine the session status.

You can log information about the session during this time. In this example, we are logging the current round and the clients that have joined the session.

Another popular option is to log the session.metrics().as_dict() to view the in-progress training metrics.

Session Complete

Congratulations, you have your first federated model! You can test it by making predictions. For more information, see (Making Predictions).

Tutorial - GLM Model Training with a Sample Local Dataset (iai_glm)

An end-to-end tutorial for how to train an existing model with a synthetic dataset on a local machine.

To help you get started, we provide a tutorial based on synthetic data with pre-built configuration files. In this tutorial, you will train a federated feedforward neural network (iai_ffn) using data from two datasets. The datasets, model, and data configuration are provided for you.

The sample notebook (integrateai_api.ipynb) contains runnable code snippets for exploring the SDK and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.

Before you get started, complete the Environment Setup for your local machine.

Open the integrateai_api.ipynb notebook to test the code as you walk through this exercise.

Understanding Models

model_config = {
    "experiment_name": "test_synthetic_tabular",
    "experiment_description": "test_synthetic_tabular",
    "strategy": {     
        "name": "FedAvg",       // Name of the federated learning strategy
        "params": {}
        },
    "model": {                  // Parameters specific to the model type 
        "params": {
            "input_size": 15, 
            "output_activation": "sigmoid"
                   }
            },
    "balance_train_datasets": False, // Performs undersampling on the dataset
    "ml_task": {
        "type": "logistic",
        "params": {
            "alpha": 0,
            "l1_ratio": 0
        }
    },
    "optimizer": {
        "name": "SGD",
        "params": {
            "learning_rate": 0.05,
            "momentum": 0.9
        }
    },
    "differential_privacy_params": {
        "epsilon": 4,
        "max_grad_norm": 7
    },
}

integrate.ai has a standard model class available for Feedforward Neural Nets (iai_ffn) and Generalized Linear Models (iai_glm). These standard models are defined using JSON configuration files during session creation.

The model configuration (model_config) is a JSON object that contains the model parameters for the session. There are five main properties with specific key-value pairs used to configure the model:

The example in the notebook is a model provided by integrate.ai. For this tutorial, you do not need to change any of the values.

data_config = {
    "predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
    "target": "y",
}

The data configuration is a JSON object where the user specifies predictor and target columns that are used to describe input data. This is the same structure for both GLM and FNN.

Once you have created or updated the model and data, the next step is to create a training session to begin working with the model and datasets.

Authenticate with your Access Token

import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")

from integrate_ai_sdk.api import connect client = connect(token=IAI_TOKEN)

Use your access token to authenticate to the API client. The SDK simplifies the authentication process by providing a connect helper module. You import the connect helper from the SDK, provide your token, and authenticate to the client.

Set an environment variable for your token (as described in Environment Setup) or replace it inline in the sample code.

Reminder: You can generate and manage tokens through the integrate.ai web portal.

Create and start the training session

session = client.create_fl_session(
    name="Testing notebook",
    description="I am testing session creation through a notebook",
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_ffnet",
    model_config=model_config,
    data_config=data_config,
).start()

session.id

Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.

Create a session each time you want to train a new model.

The code sample demonstrates creating and starting a session with two training datasets (specified as min_num_clients) and two rounds (num_rounds). It returns a session ID that you can use to track and reference your session.

The package_name specifies the federated learning model package - in the example, it is iai_ffnet however, other packages are supported. See Model packages for more information.

Join clients to the session

import subprocess
data_path = "~/Downloads/synthetic"
#Join dataset one (silo0)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1 --remove-after-complete",
    shell=True
)
#Join dataset two (silo1)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2 --remove-after-complete",
    shell=True
)

The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. Therefore, to run this example, you will call subprocess.Popen twice to connect each dataset to the session as a separate client.

The session begins training once the minimum number of clients have joined the session.

Each client runs as a separate Docker container to simulate distributed data silos.

Poll for session results

import time

current_round = None
current_status = None
while client_1.poll() is None or client_2.poll() is None:
    output1 = client_1.stdout.readline().decode("utf-8").strip()
    output2 = client_2.stdout.readline().decode("utf-8").strip()
    if output1:
        print("silo1: ", output1)
    if output2:
        print("silo2: ", output2)

    # poll for status and round
    if current_status != training_session.status:
        print("Session status: ", training_session.status)
        current_status = training_session.status
    if current_round != training_session.round and training_session.round > 0:
        print("Session round: ", training_session.round)
        current_round = training_session.round
    time.sleep(1)

output1, error1 = client_1.communicate()
output2, error2 = client_2.communicate()

print(
    "client_1 finished with return code: %d\noutput: %s\n  %s"
    % (client_1.returncode, output1.decode("utf-8"), error1.decode("utf-8"))
)
print(
    "client_2 finished with return code: %d\noutput: %s\n  %s"
    % (client_2.returncode, output2.decode("utf-8"), error2.decode("utf-8"))
)

Depending on the type of session and the size of the datasets, sessions may take some time to run. In the sample notebook and this tutorial, we poll the server to determine the session status.

You can log information about the session during this time. In this example, we are logging the current round and the clients that have joined the session.

Another popular option is to log the session.metrics().as_dict() to view the in-progress training metrics.

Session Complete

Congratulations, you have your first federated model! You can test it by making predictions. For more information, see (Making Predictions).

Linear Inference Sessions

An overview and example of a linear inference model for performing tasks such as GWAS in HFL.

The built-in model package iai_linear_inference trains a bundle of linear models for the target of interest against a specified list of predictors. It obtains the coefficients and variance estimates, and also calculates the p-values from the corresponding hypothesis tests. Linear inference is particularly useful for genome-wide association studies (GWAS), to identify genomic variants that are statistically associated with a risk for a disease or a particular trait.

This is a horizontal federated learning (HFL) model package.

The integrateai_linear_inference.ipynb, available in the integrate.ai sample repository, demonstrates this model package using the sample data that is available for download here.

Follow the instructions in the Installing the integrate.ai SDK section to prepare a local test environment for this tutorial.

Overview of the iai_linear_inference package

# Example model_config for a binary target

model_config_logit = {
    "strategy": {"name": "LogitRegInference", "params": {}},
    "seed": 23,  # for reproducibility
}

There are two strategies available in the package:

# Example data_config

data_config_logit = {
    "target": "y",
    "shared_predictors": ["x1", "x2"],
    "chunked_predictors": ["x0", "x3", "x10", "x11"]
}

The data_config dictionary should include the following three fields.

Note: The columns in all the fields can be specified as either names/strings or indices/integers.

With this example data configuration, the session trains four logistic regression models with y as the target, and x1, x2 plus any one of x0, x3, x10, x11 as predictors.

Create a linear inference training session

# Example training session

training_session_logit = client.create_fl_session(
    name="Testing linear inference session",
    description="I am testing linear inference session creation through a notebook",
    min_num_clients=2,
    num_rounds=5,
    package_name="iai_linear_inference",
    model_config=model_config_logit,
    data_config=data_config_logit,
).start()

training_session_logit.id

For this example, there are two (2) training clients and the model is trained over five (5) rounds.

Argument (type) Description
name (str) Name to set for the session
description (str) Description to set for the session
min_num_clients (int) Number of clients required to connect before the session can begin
num_rounds (str) Number of rounds of federated model training to perform
package_name (str) Name of the model package to be used in the session
model_config (dict) Contains the model configuration to be used for the session
data_config (dict) Contains the data configuration to be used for the session

Ensure that you have downloaded the sample data. If you saved it to a location other than your Downloads folder, specify the data_path to the correct location.

Expected path:

data_path = "~/Downloads/synthetic"

Start the linear inference training session

import subprocess

client_1 = subprocess.Popen(
    f"iai client train --token {IAI_TOKEN} --session {training_session_logit.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1-inference --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

client_2 = subprocess.Popen(
    f"iai client train --token {IAI_TOKEN} --session {training_session_logit.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2-inference --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

This example demonstrates starting a training session locally. The clients joining the session are named client-1-inference and client-2-inference, respectively.

Poll for linear inference session status

import time

current_round = None
current_status = None
while client_1.poll() is None or client_2.poll() is None:
    output1 = client_1.stdout.readline().decode("utf-8").strip()
    output2 = client_2.stdout.readline().decode("utf-8").strip()
    if output1:
        print("silo1: ", output1)
    if output2:
        print("silo2: ", output2)

    # poll for status and round
    if current_status != training_session_logit.status:
        print("Session status: ", training_session_logit.status)
        current_status = training_session_logit.status
    if current_round != training_session_logit.round and training_session_logit.round > 0:
        print("Session round: ", training_session_logit.round)
        current_round = training_session_logit.round
    time.sleep(1)

output1, error1 = client_1.communicate()
output2, error2 = client_2.communicate()

print(
    "client_1 finished with return code: %d\noutput: %s\n  %s"
    % (client_1.returncode, output1.decode("utf-8"), error1.decode("utf-8"))
)
print(
    "client_2 finished with return code: %d\noutput: %s\n  %s"
    % (client_2.returncode, output2.decode("utf-8"), error2.decode("utf-8"))
)

Use a polling function, such as the example provided, to wait for the session to complete.

View training metrics and model details

# Example output

{'session_id': '3cdf4be992',
 'federated_metrics': [{'loss': 0.6931747794151306},
  {'loss': 0.6766608953475952},
  {'loss': 0.6766080856323242},
  {'loss': 0.6766077876091003},
  {'loss': 0.6766077876091003}],
 'client_metrics': [{'user@integrate.ai:dedbb7e9be2046e3a49b28b0131c4b97': {'test_loss': 0.6931748060977674,
    'test_accuracy': 0.4995,
    'test_roc_auc': 0.5,
    'test_num_examples': 4000},
   'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_loss': 0.6931748060977674,
    'test_accuracy': 0.4995,
    'test_roc_auc': 0.5,
    'test_num_examples': 4000}},
  {'user@integrate.ai:dedbb7e9be2046e3a49b28b0131c4b97': {'test_num_examples': 4000,
    'test_loss': 0.6766608866775886,
    'test_roc_auc': 0.5996664746664747,
    'test_accuracy': 0.57625},
   'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_num_examples': 4000,
    'test_loss': 0.6766608866775886,
    'test_accuracy': 0.57625,
    'test_roc_auc': 0.5996664746664747}},
  {'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_loss': 0.6766080602706078,
    'test_accuracy': 0.5761875,
    'test_num_examples': 4000,
...
   'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_accuracy': 0.5761875,
    'test_roc_auc': 0.5996632246632246,
    'test_num_examples': 4000,
    'test_loss': 0.6766078165060236}}],
 'latest_global_model_federated_loss': 0.6766077876091003}

Once the session is complete, you can view the training metrics and model details such as the model coefficients and p-values. In this example, since there are a bundle of models being trained, the metrics are the average values of all the models.

training_session_logit.metrics().as_dict()

Plot the metrics

training_session_logit.metrics().plot()

Example output:

Retrieve the trained model

# Example of retrieving p-values

pv = model_logit.p_values()
pv
#Example p-value output:

x0      112.350396
x3      82.436540
x10     0.999893
x11     27.525280
dtype: float64

The LinearInferenceModel object can be retrieved using the model's as_pytorch method. The relevant information, such as p-values, can be accessed directly from the model object.

model_logit = training_session_logit.model().as_pytorch()

Retrieve the p-values

The .p_values() function returns the p-values of the chunked predictors.

Summary information

The .summary method fetches the coefficient, standard error, and p-value of the model corresponding to the specified predictor.

#Example of fetching summary

summary_x0 = model_logit.summary("x0")
summary_x0`

Example summary output:

Making predictions from a linear inference session

from torch.utils.data import DataLoader
from integrate_ai_sdk.packages.LinearInference.dataset import ChunkedTabularDataset

ds = ChunkedTabularDataset(path=f"{data_path}/test.parquet", **data_config_logit)
dl = DataLoader(ds, batch_size=len(ds), shuffle=False)
x, _ = next(iter(dl))
y_pred = model_logit(x)
y_pred

You can also make predictions with the resulting bundle of models when the data is loaded by the ChunkedTabularDataset from the iai_linear_inference package. The predictions will be of shape (n_samples, n_chunked_predictors) where each column corresponds to one model from the bundle.

#Example prediction output:

tensor([[0.3801, 0.3548, 0.4598, 0.4809],
        [0.4787, 0.3761, 0.4392, 0.3579],
        [0.5151, 0.3497, 0.4837, 0.5054],
        ...,
        [0.7062, 0.6533, 0.6516, 0.6717],
        [0.3114, 0.3322, 0.4257, 0.4461],
        [0.4358, 0.4912, 0.4897, 0.5110]], dtype=torch.float64)
`</span><span class="err">

MODEL TRAINING

Gradient Boosted Models (HFL-GBM)

Gradient boosting is a machine learning algorithm for building predictive models that helps minimize the bias error of the model. The gradient boosting model provided by integrate.ai is an HFL model that uses the sklearn implementation of HistGradientBoostingClassifier for classifier tasks and HistGradientBoostingRegresssor for regression tasks.

The GBM sample notebook (integrateai_api_gbm.ipynb) provides sample code for running the SDK, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.

Prerequisites

  1. Open the integrateai_api_gbm.ipynb notebook to test the code as you walk through this tutorial.
  2. Download the sample dataset to use with this tutorial. The sample files are: train_silo0.parquet - training data for dataset 1 train_silo1.parquet - training data for dataset 2 test.parquet - test data, to be used when joining a session

Review the sample Model Configuration

data_schema = {
    "predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
    "target": "y",
}

model_config = {
    "strategy": {
        "name": "HistGradientBoosting",
        "params": {}
    },
    "model": {
        "params": {
            "max_depth": 4, 
            "learning_rate": 0.05,
            "random_state": 23, 
            "max_bins": 128,
            "sketch_relative_accuracy": 0.001,
            "max_iter": 100,
        }
    },

    "ml_task": {"type": "classification", "params": {}}, 

    "save_best_model": {
        "metric": None,
        "mode": "min"
    },
  }

integrate.ai has a model class available for Gradient Boosted Models, called iai_gbm. This model is defined using a JSON configuration file during session creation.

The strategy for GBM is HistGradientBoosting.

You can adjust the following parameters as needed:

Set the machine learning task type to either classification or regression.

Specify any parameters associated with the task type in the params section.

The save_best_model option allows you to set the metric and mode for model saving. By default this is set to None, which saves the model from the previous round, and min.

The notebook also provides a sample data schema. For the purposes of testing GBM, use the sample schema as shown.

Create a GBM training session

training_session = client.create_fl_session(
    name="HFL session testing GBM",
    description="I am testing GBM session creation through a notebook",
    min_num_clients=2,
    num_rounds=10,
    package_name="iai_gbm",
    model_config=model_config,
    data_config=data_schema,
).start()

training_session.id

Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.

Create a session each time you want to train a new model.

The code sample demonstrates creating and starting a session with two training clients (two datasets) and ten rounds (or trees). It returns a session ID that you can use to track and reference your session.

Join the training session

import subprocess

data_path = "~/Downloads/synthetic"

client_1 = subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {training_session.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1 --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

client_2 = subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {training_session.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2 --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. The session begins training once the minimum number of clients have joined the session.

Poll for Session Results

Sessions may take some time to run depending on the computer environment. In the sample notebook and this tutorial, we poll the server to determine the session status.

View the training metrics

// Retrieve the model metrics

training_session.metrics().as_dict()

After the session completes successfully, you can view the training metrics and start making predictions. 1. Retrieve the model metrics as_dict. 2. Plot the metrics.

Example output:

{'session_id': '9fb054bc24',
 'federated_metrics': [{'loss': 0.6876291940808297},
  {'loss': 0.6825978879332543},
  {'loss': 0.6780059585869312},
  {'loss': 0.6737175708711147},
  {'loss': 0.6697578155398369},
  {'loss': 0.6658972384035587},
  {'loss': 0.6623568458259106},
  {'loss': 0.6589517279565335},
  {'loss': 0.6556690569519996},
  {'loss': 0.6526266353726387},
  {'loss': 0.6526266353726387}],
 'rounds': [{'user info': {'test_loss': 0.6817054933875072,
    'test_roc_auc': 0.6868823702288674,
    'test_accuracy': 0.7061688311688312,
    'test_num_examples': 8008},
   'user info': {'test_accuracy': 0.5720720720720721,
    'test_num_examples': 7992,
    'test_roc_auc': 0.6637941733389123,
    'test_loss': 0.6935647668830733}},
  {'user info': {'test_accuracy': 0.5754504504504504,
    'test_roc_auc': 0.6740578481919338,
    'test_num_examples': 7992,
    'test_loss': 0.6884922753070576},
   'user info': {'test_loss': 0.6767152831608197,
...
   'user info': {'test_loss': 0.6578156923815811,
    'test_num_examples': 7992,
    'test_roc_auc': 0.7210704078520924,
    'test_accuracy': 0.6552802802802803}}],
 'latest_global_model_federated_loss': 0.6526266353726387}

Plot the metrics

// Plot the metrics

fig = training_session.metrics().plot()

Example (image)

model = training_session.model().as_sklearn()
model

Example (image)

Load the test data

import pandas as pd

test_data = pd.read_parquet(f"{data_path}/test.parquet")
test_data.head()

Example (image)

Convert test data to tensors

Y = test_data["y"]

X = test_data[["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"]]

Run model predictions

model.predict(X)

Result: array([0, 1, 0, ..., 0, 0, 1])

from sklearn.metrics import roc_auc_score
y_hat = model.predict_proba(X)
roc_auc_score(Y, y_hat[:, 1])

Result: 0.7082738332738332

Note: When the training sample sizes are small, this model is more likely to overfit the training data.

Private Record Linkage (PRL) sessions

Private record linkage sessions create intersection and alignment among datasets to prepare them for vertical federated learning (VFL).

In a vertical federated learning process, two or more parties collaboratively train a model using datasets that share a set of overlapping features. These datasets generally each contain distinct data with some overlap. This overlap is used to define the intersection of the sets. Private record linkage (PRL) uses the intersection to create alignment between the sets so that a shared model can be trained.

Overlapping records are determined privately through a PRL session, which combines Private Set Intersection with Private Record Alignment.

For example, in data sharing between a hospital (party B, the Active party) and a medical imaging centre (party A, the Passive party), only a subset of the hospital patients will exist in the imaging centre's data. The hospital can run a PRL session to determine the target subset for model training.

PRL Session Overview

In PRL, two parties submit paths to their datasets so that they can be aligned to perform a machine learning task.

  1. ID columns (id_columns) are used to produce a hash that is sent to the server for comparison. The secret for this hash is shared between the clients and the server has no knowledge of it. This comparison is the Private Set Intersection (PSI) part of PRL.
  2. Once compared, the server orchestrates the data alignment because it knows which indices of each dataset are in common. This is the Private Record Alignment (PRA) part of PRL.

For information about privacy when performing PRL, see PRL Privacy for VFL.

PRL Session Example

Use the integrateai_batch_client_prl.ipynb notebook to follow along and test the examples shown below by filling in your own variables as required.

This example uses AWS Fargate and Batch to run the session.

  1. Complete the Environment Setup.
  2. Ensure that you have the correct roles and policies for Fargate and Batch. See Running AWS Batch jobs with the SDK and Running a training server on AWS Fargate for details.
  3. Authenticate to the integrate.ai API client.

Create a configuration for the PRL session.

# Example data config

prl_data_config = {
    "clients": {
        "passive_client": {"id_columns": ["id"]},
        "active_client": {"id_columns": ["id"],},
    }
}

Specify a prl_data_config that indicates the columns to use as identifiers when linking the datasets to each other. The number of items in the config specifies the number of expected clients. In this example, there are two items and therefore two clients submitting data. Their datasets are linked by the "id" column in any provided datasets.

Create the session

# Create session example

prl_session = client.create_prl_session(
    name="Testing notebook - PRL",
    description="I am testing PRL session creation through a notebook",
    data_config=prl_data_config
).start()

prl_session.id

To create the session, specify the data_config that contains the client names and columns to use as identifiers to link the datasets. For example: prl_data_config.

These client names are referenced for the compute on the PRL session and for any sessions that use the PRL session downstream.

Specify AWS parameters and credentials

# Example data paths in s3 
active_train_oath = 's3://<path to dataset>/active_train.csv'
passive_train_path = 's3://<path to dataset>/passive_train.csv'
active_test_path = 's3://<path to dataset>/active_test.csv'
passive_test_path = 's3://<path to dataset>/passive_test.csv'

# Specify the AWS parameters
cluster = "iai-server-ecs-cluster"
task_definition = "iai-server-fargate-job"
model_storage = "s3://<path to storage>"
security_group = "<security group name>"
subnet_id = "<subnet>" # Public subnet (routed via IGW)
job_queue='iai-client-batch-job-queue'
job_def='iai-client-batch-job'

# Example of using temporary credentials
aws_creds = {
    'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
    'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
    'REGION': os.environ.get("AWS_REGION"),
}
  1. Specify the paths to the datasets and the AWS Batch job information.
  2. Specify your AWS credentials if you are generating temporary ones. Otherwise, use the default profile credentials.

Create a task builder and task group

from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup

# Creating task builder objects

task_server = taskbuilder_aws.fargate(
    cluster=cluster,
    task_definition=task_definition)

tb = taskbuilder_aws.batch( 
    job_queue=job_queue,
    aws_credentials=aws_creds,
    cpu_job_definition=job_def)

task_group_context = SessionTaskGroup(prl_session)\
    .add_task(task_server.fls(subnet_id, security_group, storage_path=model_storage, client=<client>))\
    .add_task(tb.prl(train_path=passive_train_path, test_path=passive_test_path, vcpus='2', memory='16384', client=client, client_name="passive_client"))\
        .add_task(tb.prl(train_path=active_train_path, test_path=active_test_path, vcpus='2', memory='16384', client=client, client_name="active_client")).start()
  1. Set up the task builder and task group.
  2. Import the taskbuilder and taskgroup from the SDK.
  3. Specify the server and batch information to create the task builder objects.
  4. Create a task in the task group for each client. The number of tasks in the task group must match the number of clients specified in the data_config used to create the session.
  5. Set the train_path, test_path, and client_name for each task. The client_name must be the same name as specified in the data_config file.
  6. Optional: Set the vcpus and memory parameters are to override the Batch job definition.

Monitor submitted jobs

Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK. The following code returns the session ID that is included in the job name.

# session available in group context after submission
print(task_group_context.session.id)

Next, you can check the status of the tasks.

# status of tasks submitted
task_group_status = task_group_context.status()
for task_status in task_group_status:
    print(task_status)

Submitted tasks are in the pending state until the clients join and the session is started. Once started, the status changes to running.

# Use to monitor if a session has completed successfully or has failed
# You can modify the time to wait as per your specific task
task_group_context.wait(300)

View the overlap statistics

When the session is complete, you can view the overlap statistics for the datasets.

prl_session.metrics().as_dict()

Example result:

{'session_id': '07d0f8358d',
 'federated_metrics': [],
 'client_metrics': {'passive_client': {'train': {'n_records': 14400,
    'n_overlapped_records': 12963,
    'frac_overlapped': 0.9},
   'test': {'n_records': 3600,
    'n_overlapped_records': 3245,
    'frac_overlapped': 0.9}},
  'active_client': {'train': {'n_records': 14400,
    'n_overlapped_records': 12963,
    'frac_overlapped': 0.9},
   'test': {'n_records': 3600,
    'n_overlapped_records': 3245,
    'frac_overlapped': 0.9}}}}

To run a VFL training session on the linked datasets, see VFL FFNet Model Training).

To perform exploratory data analysis on the intersection, see EDA Intersect.

VFL SplitNN Model Training

In a vertical federated learning (VFL) process, two or more parties collaboratively train a model using datasets that share a set of overlapping features. Each party has partial information about the overlapped subjects in the dataset. Therefore, before running a VFL training session, a private record linkage (PRL) session is performed to find the intersection and create alignment between datasets.

There are two types of parties participating in the training:

For example, in data sharing between a hospital (party B, the Active party) and a medical imaging centre (party A, the Passive party), only a subset of the hospital patients will exist in the imaging centre's data. The hospital can run a PRL session to determine the target subset for VFL model training.

VFL Session Overview

A hospital may have patient blood tests and outcome information on cancer, but imaging data is owned by an imaging centre. They want to collaboratively train a model for cancer diagnosis based on the imaging data and blood test data. The hospital (active party) would own the outcome and patient blood tests and the Imaging Centre (passive party) would own the imaging data.

A simplified model of the process is shown below.

integrate.ai VFL Flow

The following diagram outlines the training flow in the integrate.ai implementation of VFL.

VFL Training Session Example

Use the integrateai_fargate_batch_client_vfl.ipynb notebook to follow along and test the examples shown below by filling in your own variables as required. You can download sample notebooks here.

The notebook demonstrates both the PRL session and the VFL train and predict sessions.

Note: This example uses AWS Fargate and Batch to run the session.

Complete the Environment Setup

model_config = {
    "strategy": {"name": "SplitNN", "params": {}},
    "model": {
        "feature_models": {
            "passive_client": {"params": {"input_size": 7, "hidden_layer_sizes": [6], "output_size": 5}},
            "active_client": {"params": {"input_size": 8, "hidden_layer_sizes": [6], "output_size": 5}},
        },
        "label_model": {"params": {"hidden_layer_sizes": [5], "output_size": 2}},
    },
    "ml_task": {
        "type": "classification",
        "params": {
            "loss_weights": None,
        },
    },
    "optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
    "seed": 23,  # for reproducibility
}
data_config = {
        "passive_client": {
            "label_client": False,
            "predictors": ["x1", "x3", "x5", "x7", "x9", "x11", "x13"],
            "target": None,
        },
        "active_client": {
            "label_client": True,
            "predictors": ["x0", "x2", "x4", "x6", "x8", "x10", "x12", "x14"],
            "target": "y",
        },
    }
  1. Ensure that you have the correct roles and policies for Fargate and Batch. See AWS Batch and Fargate Manual Setup for details.
  2. Authenticate to the integrate.ai API client.
  3. Run a PRL session to obtain the aligned dataset. This session ID is required for the VFL training session.
  4. Create a model_config and a data_config for the VFL session.

Parameters:

Create and start a VFL training session

vfl_train_session = client.create_vfl_session(
    name="Testing notebook - VFL Train",
    description="I am testing VFL Train session creation through a notebook",
    prl_session_id=prl_session.id,
    vfl_mode='train',
    min_num_clients=2,
    num_rounds=5,
    package_name="iai_ffnet",
    data_config=data_config,
    model_config=model_config,
    startup_mode="external"
).start()

vfl_train_session.id

Specify the PRL session ID and ensure that the vfl_mode is set to train.

Set up the task builder and task group for VFL training

vfl_task_group_context = SessionTaskGroup(vfl_train_session)\
        .add_task(task_server.fls(subnet_id, security_group, storage_path=model_storage, client=client))\
        .add_task(tb.vfl_train(train_path=passive_train_path, test_path=passive_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path=model_storage, client=client, client_name="passive_client"))\
        .add_task(tb.vfl_train(train_path=active_train_path, test_path=active_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path=model_storage, client=client, client_name="active_client")).start()

In the sample notebook, the server and client task builders are set up in the PRL session workflow, so you only have to create the VFL task group.

If you are not using the notebook, ensure that you import the required packages from the SDK. Create a task in the task group for the server, and for each client. The number of client tasks in the task group must match the number of clients specified in the data_config used to create the session.

The following parameters are required for each client task:

The vcpus and memory parameters are optional overrides for the job definition.

Monitor submitted VFL training jobs

# session available in group context after submission
print(vfl_task_group_context.session.id)

# status of tasks submitted
vfl_task_group_status = vfl_task_group_context.status()
for task_status in vfl_task_group_status:
    print(task_status)

# Use to monitor if a session has completed successfully or has failed
# You can modify the time to wait as per your specific task
vfl_task_group_context.wait(300)

Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK.

View the VFL training metrics

# Example results

{'session_id': '498beb7e6a',
 'federated_metrics': [{'loss': 0.6927943530912943},
  {'loss': 0.6925891094472265},
  {'loss': 0.6921983339753467},
  {'loss': 0.6920029462394067},
  {'loss': 0.6915351291650617}],
 'client_metrics': [{'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_roc_auc': 0.5286237121001411,
    'test_num_examples': 3245,
    'test_loss': 0.6927943530912943,
    'test_accuracy': 0.5010785824345146}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_num_examples': 3245,
    'test_accuracy': 0.537442218798151,
    'test_roc_auc': 0.5730010669487545,
    'test_loss': 0.6925891094472265}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_accuracy': 0.550693374422188,
    'test_roc_auc': 0.6073282812853845,
    'test_loss': 0.6921983339753467,
    'test_num_examples': 3245}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_loss': 0.6920029462394067,
    'test_roc_auc': 0.6330078151716465,
    'test_accuracy': 0.5106317411402157,
    'test_num_examples': 3245}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_roc_auc': 0.6495852274713467,
    'test_loss': 0.6915351291650617,
    'test_accuracy': 0.5232665639445301,
    'test_num_examples': 3245}}]}

Once the session completes successfully, you can view the training metrics.

vfl_train_session.metrics().as_dict()

Plot the VFL training metrics.

fig = vfl_train_session.metrics().plot()

Example of plotted training metrics

VFL Prediction Session Example

This example continues the workflow in the previous sections: PRL Session Example and VFL Training Session Example.

Create and start a VFL prediction session

# Example configuration of a VFL predict session

vfl_predict_session = client.create_vfl_session(
    name="Testing notebook - VFL Predict",
    description="I am testing VFL Predict session creation through a notebook",
    prl_session_id=prl_session.id,
    training_session_id=vfl_train_session.id,
    vfl_mode='predict',
    data_config=data_config
).start()

vfl_predict_session.id

To create a VFL prediction session, specify the PRL session ID (prl_session_id) and the VFL train session ID (training_session_id) from your previous succesful PRL and VFL sessions.

Set the vfl_mode to predict.

Specify the full path for the storage location for your predictions, including the file name.

active_predictions_storage_path="<full path of predictions file name>"

Create and start a task group for the prediction session

# Example of setting up a task group

vfl_predict_task_group_context = SessionTaskGroup(vfl_predict_session)\
        .add_task(task_server.fls(subnet_id, security_group, storage_path=model_storage, client=client))\
        .add_task(tb.vfl_predict(client_name='active_client', dataset_path=active_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path=active_predictions_storage_path, client=client, raw_output=True))\
        .add_task(tb.vfl_predict(client_name='passive_client', dataset_path=passive_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path="passive_no_path", client=client, raw_output=True))
        .start()

Monitor submitted VFL prediction jobs

# Example of monitoring tasks

print(vfl_predict_task_group_context.session.id)

vfl_predict_task_group_status = vfl_predict_task_group_context.status()
for task_status in vfl_predict_task_group_status:
    print(task_status)

vfl_predict_task_group_context.wait(300)

Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK.

View VFL predictions

After the predict session completes successfully, you can view the predictions from the Active party and evaluate the performance.

import pandas as pd

df_pred = pd.read_csv(active_predictions_storage_path)

df_pred.head()

Example output:

Gradient Boosted Models for VFL

Gradient boosting is a machine learning algorithm for building predictive models that helps minimize the bias error of the model. The gradient boosting model for VFL provided by integrate.ai uses the sklearn implementation of HistGradientBoostingClassifier for classifier tasks and HistGradientBoostingRegresssor for regression tasks.

The VFL-GBM sample notebook (integrateai_taskrunner_vfl_gbm.ipynb) provides sample code for running the SDK, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information.

Prerequisites

  1. Open the integrateai_taskrunner_vfl_gbm.ipynb notebook to test the code as you walk through this tutorial.
  2. Download the sample dataset to use with this tutorial. The sample files are: active_train.parquet - training data for the active party active_test.parquet - test data for the active party, used when joining a session passive_train.parquet - training data for the passive party passive_test.parquet - test data for the passive party, used when joining a session

Run a PRL session to align the datasets for VFL-GBM

Before you run a VFL session, you must first run a PRL session to align the datasets. The sample notebook provides two examples of running a PRL session with different match thresholds.

For more information, see Private Record Linkage (PRL) Sessions.

Review the sample Model Configuration for GBM

```model_config = { "strategy": {"name": "VflGbm", "params": {"hide_intersection": False}}, "model": { "params": { "max_depth": 4, "learning_rate": 0.05, "random_state": 23, "max_bins": 255, } }, "ml_task": {"type": 'classification', "params": {}}, }

data_config = { "passive_client": { "label_client": False, "predictors": ["x1", "x3", "x5", "x7", "x9", "x11", "x13"], "target": None, }, "active_client": { "label_client": True, "predictors": ["x0", "x2", "x4", "x6", "x8", "x10", "x12", "x14"], "target": "y", }, } ```

integrate.ai has a model class available for Gradient Boosted Models, called iai_gbm. This model is defined using a JSON configuration file during session creation.

The strategy for VFL-GBM is VflGbm. Note that this is different than the strategy name for an HFL GBM session, which is HistGradientBoosting.

You can adjust the following parameters as needed:

For more information, see the scikit documentation for HistGradientBoostingClassifier.

Set the machine learning task type to either classification or regression.

Specify any parameters associated with the task type in the params section.

The notebook also provides a sample data schema. For the purposes of testing VFL-GBM, use the sample schema as shown.

Create a VFL-GBM training session

vfl_train_session = client.create_vfl_session(
    name="Testing notebook - VFL-GBM training",
    description="I am testing VFL GBM training session creation through a notebook",
    prl_session_id=prl_session.id,
    vfl_mode="train",
    min_num_clients=2,
    num_rounds=5,
    package_name="iai_ffnet",
    data_config=data_config,
    model_config=model_config,
    startup_mode="external"
).start()

vfl_train_session.id

Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.

Create a session each time you want to train a new model.

The code sample demonstrates creating and starting a session with two training clients (two datasets) and five rounds. It returns a session ID that you can use to track and reference your session.

Create and start a task group to run the training session

# Create and start a task group with one task for the server, and one for each of the clients joining the session

vfl_task_group_context = (SessionTaskGroup(vfl_train_session)\
    .add_task(iai_tb_aws.fls(storage_path=storage_path))\
    .add_task(iai_tb_aws.vfl_train(train_path=active_train_path, 
                                    test_path=active_test_path, 
                                    batch_size=1024,
client_name="active_client", storage_path=aws_storage_path))\ .add_task(iai_tb_aws.vfl_train(train_path=passive_train_path, test_path=passive_test_path, batch_size=1024, client_name="passive_client", storage_path=aws_storage_path))\ .start())

The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. The session begins training once the minimum number of clients have joined the session.

Each client must have a unique name that matches the name specified in the data_config. For example, active_client and passive_client.

Poll for Session Results

# Check the status of the tasks

for i in vfl_task_group_context.contexts.values(): print(json.dumps(i.status(), indent=4))

vfl_task_group_context.monitor_task_logs()

vfl_task_group_context.wait(60*5, 2)

Sessions may take some time to run depending on the computer environment. In the sample notebook and this tutorial, we poll the server to determine the session status.

View the training metrics

metrics = vfl_train_session.metrics().as_dict()
metrics

fig = vfl_train_session.metrics().plot()

After the session completes successfully, you can view the training metrics and start making predictions. 1. Retrieve the model metrics as_dict. 2. Plot the metrics.

VFL-GBM Prediction

# Create and start a VFL predict session

vfl_predict_session = client.create_vfl_session( name="Testing notebook - VFL Predict", description="I am testing VFL Predict session creation with an AWS task runner through a notebook", prl_session_id=prl_session.id, training_session_id=vfl_train_session.id, vfl_mode="predict", data_config=data_config, startup_mode="external" ).start()

vfl_predict_session.id

After you have completed a successful PRL and VFL train session, you can use those sessions to create a VFL prediction session.

For more information about VFL predict mode, see VFL Prediction Session Example.

DATA ANALYSIS

EDA in Individual Mode

The Exploratory Data Analysis (EDA) feature for horizontal federated learning (HFL) enables you to access summary statistics about a group of datasets without needing access to the data itself. This allows you to get a basic understanding of the dataset when you don't have access to the data or you are not allowed to do any computations on the data.

EDA is an important pre-step for federated modelling and a simple form of federated analytics. The feature has a built in differential privacy setting. Differential privacy (DP) is dynamically added to each histogram that is generated for each feature in a participating dataset. The added privacy protection causes slight noise in the end result.

At a high level, the process is similar to that of creating and running a session to train a model. The steps are:

  1. Authenticate with your access token.
  2. Configure an EDA session.
  3. Create and start the session.
  4. Run the session and poll for session status.
  5. Analyze the datasets.

The sample notebook (integrate_ai_api.ipynb) provides runnable code examples for exploring the API, including the EDA feature, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.

API Reference for EDA

The core API module that contains the EDA-specific functionality is integrate_ai_sdk.api.eda. This module includes a core object called EdaResults, which contains the results of an EDA session.

If you are comfortable working with the integrate.ai SDK and API, see the API Documentation for details.

This tutorial assumes that you have correctly configured your environment for working with integrate.ai, as described in Installing the SDK.

Configure an EDA Session

To begin exploratory data analysis, you must first create a session, the same as you would for training a model. In this case, to configure the session, you must specify either the dataset_config, or num_datasets argument.

Using a dataset_config file

The dataset_config file is a configuration file that maps the name of one or more datasets to the columns to be pulled. Dataset names and column names are specified as key-value pairs in the file.

For each pair, the keys are dataset names that are expected for the EDA analysis. The values are a list of corresponding columns. The list of columns can be specified as column names (strings), column indices (integers), or a blank list to retrieve all columns from that particular dataset.

If a dataset name is not included in the configuration file, all columns from that dataset are used by default.

For example:

To retrieve all columns for a submitted dataset named dataset_one:

dataset_config = {"dataset_one": []}

To retrieve the first column and the column x2 for a submitted dataset named dataset_one:

dataset_config = {"dataset_one": [1,"x2"]}

To retrieve the first column and the column x2 for a submitted dataset named dataset_one and all columns in a dataset named dataset_two:

dataset_config = {"dataset_one": [1,"x2"],"dataset_two": []}

Specifying the number of datasets

You can manually set the number of datasets that are expected to be submitted for an EDA session by specifying a value for num_datasets.

If num_datasets is not specified, the number is inferred from the number of datasets provided in dataset_config.

Create and start an EDA session

dataset_config = {"dataset_one": [1,"x5","x7"], "dataset_two": ["x1","x10","x11"]}

eda_session = client.create_eda_session(
    name="Testing notebook - EDA",
    description="I am testing EDA session creation through a notebook",
    data_config=dataset_config
).start()

eda_session.id

The code sample demonstrates creating and starting an EDA session to perform privacy-preserving data analysis on two datasets, named dataset_one and dataset_two. It returns an EDA session ID that you can use to track and reference your session.

The dataset_config used here specifies that the first column (1), column x5, and column x7 will be analyzed for dataset_one and columns x1, x10, and x11 will be analyzed for dataset_two.

Since the num_datasets argument is not provided to client.create_eda_session(), the number of datasets is inferred as two from the dataset_config.

For more information, see the create_eda_session() definition in the API documentation.

Poll for session status

# Example of polling 

import time

while eda_session.status == "running": 
    if eda_session.round is None: 
        print("Polling for Session info")
    if eda_session.round is not None:
        if eda_session.round > 0:
            print("EDA session in progress.")
        else:
            print("EDA Session clean up")
    print("-------------------------")
    time.sleep(1)

You can log information about the session during this time.

Analyze the EDA results

The results object is a dataset collection comprised of multiple datasets that can be retrieved by name. Each dataset is comprised of columns that can be retrieved by either column name or by index.

You can perform the same base analysis functions at the collection, dataset, or column level.

results = eda_session.results()

*Example output:

EDA Session collection of datasets: ['dataset_two', 'dataset_one']

Describe

Use the .describe() method to retrieve a standard set of descriptive statistics.

If a statistical function is invalid for a column (for example, mean requires a continuous column and x10 is categorical), or the column from one dataset is not present in the other (for example, here x5 is in dataset_one, but not dataset_two), then the result is NaN.

results.describe()

results["dataset_one"].describe()

Statistics

For categorical columns, you can use other statistics for further exploration. For example, unique_count, mode, and uniques.

results["dataset_one"][["x10", "x11"]].uniques()

You can call functions such as .mean(), .median(), and .std() individually.

results["dataset_one"].mean()

results["dataset_one"]["x1"].mean()

Histograms

You can create histogram plots using the .plot_hist() function.

saved_dataset_one_hist_plots = results["dataset_two"].plot_hist()

single_hist = results["dataset_two"]["x1"].plot_hist()

EDA in Intersect Mode

The Exploratory Data Analysis (EDA) Intersect feature for private record linkage (PRL) sessions enables you to access summary statistics about a group of datasets without needing access to the data itself. This allows you to get a basic understanding of the dataset when you don't have access to the data or you are not allowed to do any computations on the data. It enables you to understand more about the intersection between two datasets.

EDA is an important pre-step for federated modelling and a simple form of federated analytics. The feature has a built in differential privacy setting. Differential privacy (DP) is dynamically added to each histogram that is generated for each feature in a participating dataset. The added privacy protection causes slight noise in the end result.

At a high level, the process is similar to that of creating and running a session to train a model. The steps are:

  1. Run a PRL session to determine the intersection between your datasets.
  2. Configure an EDA Intersect session.
  3. Create and start the session.
  4. Run the session and poll for session status.
  5. Analyze the results.

Use the (integrateai_eda_intersect_batch.ipynb) notebook to follow along and test the examples shown below by filling in your own variables as required. This documentation provides supplementary and conceptual information to expand on the code demonstration.

API Reference

The core API module that contains the EDA-specific functionality is integrate_ai_sdk.api.eda. This module includes a core object called EdaResults, which contains the results of an EDA session. If you are comfortable working with the integrate.ai SDK and API, see the API Documentation for details.

Configure an EDA Intersect Session

eda_data_config = {"prl_sil0":[],"prl_silo1":[]} 
prl_session_id = "<PRL_SESSION_ID>"

This example uses AWS Batch to run the session using data in S3 buckets. Ensure that you have completed the AWS Batch and Fargate Manual Setup and that you have the correct roles and policies for AWS Batch and Fargate. See Running AWS Batch jobs with the SDK for details.

To begin exploratory data analysis, you must first create a session, the same as you would for training a model. To configure the session, specify the following:

The eda_data_config specifies the names of the datasets used to generate the intersection in the PRL session in the format dataset_name : columns. If columns is empty ([]), then EDA is performed on all columns.

You must also specify the session ID of a successful PRL session using the datasets listed in the eda_data_config.

Column Correlation

If you want to find the correlation (or any other binary operation) between two specific columns, you must specify those columns as paired columns to ensure that the 2D histogram is calculated for them. Otherwise, only 1D histograms are generated for the columns separately, similar to the individual mode, except that they are constructed with only the overlapping samples from the intersection.

To set which pairs you are interested in, specify their names in a dictionary like data_config.

For example: {"passive_client": ['x1', 'x5'], "active_client": ['x0', 'x2']}

will generate 2D histograms for these pairs of columns:

(x0, x1), (x0, x5), (x2, x1), (x2, x5), (x0, x2), (x1, x5)

paired_cols = {"active_client": ['x0', 'x2'], "passive_client": ['x1']}

Create and start an EDA intersect session

eda_session = client.create_eda_session(
    name="EDA Intersect Session",
    description="Testing EDA Intersect mode through a notebook",
    data_config=eda_data_config,
    eda_mode="intersect",  #Generates histograms on an overlap of two distinct datasets
    prl_session_id=prl_session_id
    hide_intersection=True,   #Optional - specifies whether to apply CKKS encryption when generating the output. Defaults to true.
    paired_columns=paired_cols  #Optional - only required to generate 2D histograms
).start()

eda_session.id

The code sample demonstrates creating and starting an EDA session to perform privacy-preserving data analysis on the intersection of two distinct datasets. It returns an EDA session ID that you can use to track and reference your session.

For more information, see the create_eda_session() definition in the API documentation.

Start an EDA Intersect session using AWS Batch

Example data paths in s3

train_path1 = 's3://sample-data.integrate.ai/prl/prl_silo0.csv'
train_path2 = 's3://sample-data.integrate.ai/prl/prl_silo1.csv'
test_path1 = 's3://sample-data.integrate.ai/prl/prl_silo0.csv'
test_path2 = 's3://sample-data.integrate.ai/prl/prl_silo1.csv'

Specify the AWS parameters

model_storage = "s3://sample-data.integrate.ai"
job_queue='iai-client-batch-job-queue'
job_def='iai-client-batch-job'

Specify your AWS credentials if you are generating temporary ones. Otherwise, use the default profile credentials.

Import the task builder and task group from the SDK.

from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup

Specify the batch information to create the task builder object.

Create the task.

tb = taskbuilder_aws.batch( 
    job_queue=job_queue,
    aws_credentials=aws_creds,
    cpu_job_definition=job_def)

Create and run the task group.

task_group_context = SessionTaskGroup(eda_session)\
.add_task(tb.eda(dataset_path=train_path1, dataset_name="prl_silo0", vcpus='2', memory='16384', client=client))\
.add_task(tb.eda(dataset_path=train_path2, dataset_name="prl_silo1", vcpus='2', memory='16384', client=client)).start()
.start()

Create a task in the task group for each client. The number of tasks in the task group must match the number of clients specified in the data_config used to create the session.

The vcpus and memory parameters are optional overrides for the job definition.

Monitor submitted EDA Intersect jobs

# session available in group context after submission
print(task_group_context.session.id)

# status of tasks submitted
task_group_status = task_group_context.status()
for task_status in task_group_status:
    print(task_status)

# Use to monitor if a session has completed successfully or has failed
# You can modify the time to wait as per your specific task
task_group_context.wait(300)

Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK.

Submitted tasks are in the pending state until the clients join and the session is started. Once started, the status changes to running.

When the session completes successfully, "True" is returned. Otherwise, an error message appears.

Analyze the results

To retrieve the results of the session:

results = eda_session.results()

Example output:

EDA Session collection of datasets: ['active_client', 'passive_client']

Describe

You can use the .describe() function to review the results.

results.describe()

Example output:

Statistics

For categorical columns, you can use other statistics for further exploration. For example, unique_count, mode, and uniques.

results["active_client"][["x10", "x11"]].uniques()

Example output:

Mean

You can call functions such as .mean(), .median(), and .std() individually.

results["active_client"].mean()

Example output:

Histograms

You can create histogram plots using the .plot_hist() function.

saved_dataset_one_hist_plots = results["active_client"].plot_hist()

Example output:

single_hist = results["active_client"]["x10"].plot_hist()

Example output:

2D Histograms

You can also plot 2D-histograms of specified paired columns.

fig = results.plot_hist(active_client['x0'], passive_client['x1'])

Example output:

Correlation

You can perform binary calculations on columns specified in paired_columns, such as finding the correlation.

results.corr(active_client['x0'], passive_client['x1'])

Example output:

Addition, subtraction, division

Addition example. Change the operator to try subtraction, division, etc.

op_res = active_client['x0']+passive_client['x1'] fig = op_res.plot_hist()

Example output:

GroupBy

groupby_result = results.groupby(active_client['x0'])[passive_client['x5']].mean() print (groupby_result)

Principal Component Analysis (PCA)

Principal Component Analysis (PCA) is a dimensionality reduction technique that helps rule out features that are not important to the model. In other words, it generates a transformation to apply on a dataset to simplify the number of features used while retaining enough information in the features to represent the original dataset.

The main use of PCA is to help researchers uncover the internal pattern underlying the data. This pattern is then helpful mainly in two applications:

  1. Helps discover new groups and clusters among patients
  2. More efficient modelling: achieving the same or even better predictive power with less input and simpler models.

The iai_pca session outputs a transformation matrix that is generated in a distributed manner. This transformation can be applied to each dataset of interest.

The underlying technology of this session type is FedSVD (federated singular value decomposition), which can be used in other dimensionality reduction methods.

For a real-world example of using a PCA session, see the pca_face_decomposition.ipynb notebook available in the sample repository. Sample data for running this notebook is available in a zip file here.

Configure a PCA Session

model_config = {
    "experiment_name": "test_pca",
    "experiment_description": "test_pca",
    "strategy": {"name": "FedPCA", "params": {}},
    "model": {
        "params": {
            "n_features": 10,   # optional
            "n_components": 3,
            "whiten": False
        }
    },
}

data_config = {
    "predictors": []
}

This tutorial assumes that you have correctly configured your environment for working with integrate.ai, as described in Installing the SDK and Using integrate.ai. It uses locally run Docker containers for the client and server.

This example describes how to train an HFL session with the built-in package iai_pca, which performs the principal component analysis on the raw input data, and outputs the desired number of principal axes in the feature space along with the corresponding eigenvalues. The resulting model can be used to transform data points from the raw input space to the principal component space.

Sample model config and data config for PCA

In the model_config, use the strategy FedPCA with the iai_pca package.

There are three parameters that can be configured:

The example performs PCA on the 10 input features and outputs the first 3 principal axes.

Create and start a PCA training session

pca_session = client.create_fl_session(
    name="Testing PCA ",
    description="Testing a PCA session through a notebook",
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_pca",
    model_config=model_config,
    data_config=data_config,
).start()

pca_session.id

Note that the num_rounds parameter is ignored when iai_pca is used, because PCA sessions determine the number of rounds required automatically.

Make sure that the sample data you downloaded is either saved to your ~/Downloads directory, or you update the data_path below to point to the sample data.

data_path = "~/Downloads/synthetic"

Create a task group for the server and the two clients.

from integrate_ai_sdk.taskgroup.taskbuilder import local
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup


tb = local.local_docker(
    client,
    docker_login=False,

)

task_group_context = (
    SessionTaskGroup(pca_session)
    .add_task(tb.fls(storage_path=storage_path, docker_options={"cpu_shares": 512}))\
    .add_task(tb.hfl(train_path=train_path1, test_path=test_path, client_name="client1"))\
    .add_task(tb.hfl(train_path=train_path2, test_path=test_path, client_name="client2"))\
    .start()
)

Wait for the session to complete successfully. This code will return True when it is complete.

task_group_context.wait(60 * 5)

View PCA session results

Note that the reverse process of mapping data points from the principal component space back to the original feature space can be treated as a multivariate regression task (i.e., reconstruct the raw features). We log some regression metrics (e.g., loss/mse and r2) for PCA sessions. They can be pulled as follows.

training_session.metrics().as_dict()

The PCA results can be retrieved as follows. It is a standard PyTorch model object, and the .state_dict() method can be used to show all stored parameters, including the principal axes matrix and eigenvalues.

pca_transformation = training_session.model().as_pytorch()
pca_transformation.state_dict()

We can also transform data points from the original feature space to the principal component space by directly calling the model object on the data tensors.

import pandas as pd

test_data = pd.read_parquet(f"{data_path}/test.parquet")
test_data.head()
import torch

test_data_tensor = torch.tensor(test_data[data_schema["predictors"]].values)
test_data_pc = pca_transformation(test_data_tensor)
test_data_pc.shape

MANUAL AWS DEPLOYMENT AND USE

AWS Batch and Fargate Manual Setup

AWS Requirements

This section outlines the setup steps required to configure your working environment. Steps that are performed in the AWS platform are not explained in detail. Refer to the AWS documentation as needed.

The requirements are tool-agnostic - that is, you can complete the steps through the AWS console, or through a tool such as Terraform or AWS CloudFormation.

AWS Authentication

aws ssm put-parameter --name iai-token --value <IAI_TOKEN> --type SecureString

Complete the steps to create an IAI access token, hereafter referred to as .

On the AWS CLI, set the token as a parameter for your SSM agent. SSM handles getting and using the token as needed for the batch session.

# Example response

{
    "Version": 1,
    "Tier": "Standard"
}

About "secrets"

In order for the batch job to access the integrate.ai JWT through SSM it needs to be set in the job definition secrets configuration.

Set the name of the secret to IAI_TOKEN to create the IAI_TOKEN environment variable that the docker client uses to authenticate with the session.

The valueFrom is the SSM key that contains the integrate.ai access token. If you are running the batch job in the same region that the SSM parameter was created in, pass in the name of the SSM parameter. If the SSM parameter is in a different region, pass in the SSM parameter ARN.

To have different tokens for different user groups, you must have a different batch job definition for each token.

Create an AWS KMS Key

If you do not already have one, define a symmetric key for encryption and decryption in AWS KMS. The IAI server uses the ID of your key to retrieve the IAI access token through SSM.

  1. In the KMS console, click Create a Key.
  2. Keep the default settings for Symmetric and Encrypt and decrypt. Click Next.
  3. Type an Alias for the key. Click Next.
  4. Select one or more Key administrators and click Next.
  5. Select one or more IAM Users and/or Roles to grant access to the key. Note: The SDK User IAM Role must have GenerateDataKey and Encrypt permissions. The Fargate Task and/or Batch job roles must have Decrypt permission.
  6. Click Next, then click Finish.

Make a note of the Key ID. This parameter is required as part of the training session definition.

Set up integrate.ai components

# Create a repository for the client
aws ecr create-repository --repository-name iai_client

# Create a repository for the server
aws ecr create-repository --repository-name iai_fl_server

# Log in and upload the Docker images
aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com

# Tag and push the client image
docker tag 919740802015.dkr.ecr.<AWS_REGION>.amazonaws.com/edge/fl-client:<version> <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_client:<version>
docker push 919740802015.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_client:<version>

#Tag and push the server image
docker tag 919740802015.dkr.ecr.<AWS_REGION>.amazonaws.com/edge/iai_fl_server:<version> <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_fl_server:<version>
docker push <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_fl_server:<version>

Install the integrate.ai CLI tool, the SDK, and the client. See Installing the SDK for details.

Push the IAI client Docker image to an AWS ECR repository. See the AWS ECR documentation for detailed instructions for setting up ECR, then upload the integrate.ai client Docker image.

Note: The client and server versions change as updates are released. Make sure that you are always uploading the latest version by specifying the correct version number.

Set up the dataset

aws s3 cp </path/to/data> s3://<your.aws.data.bucket> --recursive

Create one or more S3 buckets to contain the dataset(s). The URL for the dataset is a required parameter for the SDK.

If necessary, copy the data to your S3 bucket, as shown in the example.

Note: Only S3 is supported.

Create AWS Batch Roles and Policies

This guide describes in brief how to create and manage roles and policies through the AWS IAM service console. The JSON configuration is also provided for those using Terraform or other tools.

You can create roles from the Roles link under Access Management in IAM.

Note: For the sample code that follows, replace any variable placeholders (such as ) with the correct information for your environment before attempting to use it.

EC2 Instance Role

{
    "Version": "2012-10-17",
    "Statement": [
      {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "Service": "batch.amazonaws.com"
          }
      }
    ]
}

arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
  1. Click Create Role.
  2. Select AWS Service.
  3. Select EC2.
  4. Click Next.
  5. In the Permissions policies search box, type AmazonEC2ContainerServiceforEC2. This is the ARN.
  6. Select this policy and click Next.
  7. Provide a meaningful name for the role.
  8. Review the configuration and compare it to the example below.
  9. Click Create role.

Batch Service Role

  1. Create an associated instance profile and batch service role.
  2. Click Create Role.
  3. Select AWS Service.
  4. Select Batch from the drop-down list, then select Batch.
  5. Click Next.
  6. On the Permissions policies page, click Next.
  7. Provide a meaningful name for the role.
  8. Review the configuration and compare it to the example below.
  9. Click Create role.

ECS Task Role (Job Role) with CloudWatch and S3 policies

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowLogGroupAndStreamAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:DescribeLogStreams",
                "logs:PutLogEvents",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:<AWS_REGION>:<AWS_ACCOUNT>:log-group:/iai-client/batch:log-stream:*"
            ]
        }
    ]
}

This role requires access to the S3 bucket containing your data. It requires the ECS task role policy and an S3 access policy.

  1. Click Create Role.
  2. Select AWS Service.
  3. Select Elastic Container Service from the drop-down list.
  4. Select Elastic Container Service Task.
  5. Click Next.
  6. On the Add Permissions page, click Create policy. This opens a Policies page in a second browser window to allow you to create policies and return to your role to attach them.
  7. On the Create policy page, select the JSON tab and paste the following, with your <AWS region> and <AWS account> filled in.
  8. Click Next: Tags, then click Next: Review.
  9. Provide a meaningful name for the policy.
  10. Click Create policy.

Add an S3 policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3BucketReadAccess",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket", 
                "s3:GetEncryptionConfiguration",
                "s3:GetObject",
                "s3:GetObjectAcl",
                "s3:ListBucketVersions"
            ],
            "Resource": [
                "arn:aws:s3:::<your.aws.data.bucket>",
                "arn:aws:s3:::<your.aws.data.bucket>/*"
            ]
        }
    ]
}
  1. Click Create policy.
  2. On the Create policy page, select the JSON tab and paste the example, with the name of <your.aws.data.bucket> filled in.

The S3 policy on the ECS task role restricts the Job Definition to the S3 buckets that are referenced. To restrict data access further, consider creating different Job Definitions that access different S3 buckets.

  1. Click Next: Tags, then click Next: Review.
  2. Provide a meaningful name for the policy.
  3. Click Create policy.

Add policies to the role

  1. Return to the Add permissions page in the previous browser window.
  2. Use the Permissions policies search box to search for and select the names of the two policies you just created.
  3. Click Next.
  4. Provide a meaningful name for the role, such as ecs-task-role.
  5. Click Create role.

ECS Execution Role with ECR, CloudWatch, and SSM policies

{
   "Version": "2012-10-17",
   "Statement": [
     {
       "Effect": "Allow",
       "Action": [
         "ecr:GetAuthorizationToken",
         "ecr:BatchCheckLayerAvailability",
         "ecr:GetDownloadUrlForLayer",
         "ecr:BatchGetImage",
         "logs:CreateLogStream",
         "logs:PutLogEvents"
       ],
       "Resource": "*"
     }
   ]
 }

This role gives the batch job access to ECR and SSM. It requires the ECS task role policy and ECR, CloudWatch, and SSM policies.

  1. Click Create Role.
  2. Select AWS Service.
  3. Select Elastic Container Service from the drop-down list.
  4. Select Elastic Container Service Task.
  5. Click Next.
  6. In the Permissions policies search box, type AmazonECSTaskExecutionRolePolicy.
  7. Select this policy, then click Next.
  8. Provide a meaningful name for the role, such as ecs-execution-role.
  9. Scroll down to Step 2: Add permissions and click Edit.
  10. Click Create policy.
  11. On the Create policy page, select the JSON tab and paste the example.
  12. Click Next: Tags and then click Next: Review.
  13. Provide a meaningful name for the policy.
  14. Click Create policy.
  15. In the Create role browser window, in the Permissions policies search box, type the name of the policy you just created. Note: you may have to click the Refresh button beside the Create policy button to make the new policy appear.
  16. Select the policy.
  17. Click Next.
  18. Repeat the process above to add an SSM policy. Fill in your <AWS region> and <AWS account> information.
# ECS Execution Role SSM Policy
{
     "Version": "2012-10-17",
     "Statement": [
         {
             "Sid": "SSMAccessForIAIToken",
             "Effect": "Allow",
             "Action": [
                 "ssm:DescribeParameters",
                 "ssm:GetParameter",
                 "ssm:GetParameters"
             ],
             "Resource": [
                 "arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/iai-token"
             ]
         }
     ]
 }
  1. Add the policy to your ECS Execution role.
  2. Click Next.
  3. Click Create Role.

SDK User Role

# Example IAM policy for SDK User Role
{
    "Statement": [
        {
            "Action": [
                "batch:SubmitJob"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:batch:<AWS_REGION>:<AWS_ACCOUNT>:job-queue/iai-client-batch-job-queue",
                "arn:aws:batch:<AWS_REGION>:<AWS_ACCOUNT>:job-queue/iai-client-batch-job-queue/*",
                "arn:aws:batch:<AWS_REGION>:<AWS_ACCOUNT>:job-definition/iai-client-batch-job"
            ],
            "Sid": "AllowUserBatchJobSubmission"
        },
        {
            "Action": [
                "batch:DescribeJobs"
            ],
            "Effect": "Allow",
            "Resource": [
                "*"
            ],
            "Sid": "AllowUserDescribeBatch"
        },
        {
            "Action": [
                "ssm:DescribeParameters",
                "ssm:GetParameter"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/iai-token"
            ],
            "Sid": "SSMAccessForIAIToken"
        }
    ],
    "Version": "2012-10-17"
}

The role and permissions required to set up and configure the environment are different than the permissions required to start an integrate.ai client batch job using the integrate.ai SDK.

The end user of the SDK requires, at minimum, permission to submit a batch job using the job queue and job definition created earlier, and permission to describe jobs.

Any additional permissions required depend on your application. For example, you may give the user the ability to access the SSM parameter set earlier, so that they can pull the integrate.ai access token and use it to manage sessions with the integrate.ai SDK.

Create Fargate roles and policies

Fargate Execution Role

# Fargate Execution role
{
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "",
        "Effect": "Allow",
        "Principal": {
            "Service": "ecs-tasks.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
        }
    ]
}

Create an execution role to allow the batch job to access ECR and SSM.

In the IAM console, create an AWS Service role. Use the drop-down menu to select Elastic Container Service as the Use case, and select Elastic Container Service Task.

Under Permissions policies, select AmazonECSTaskExecutionRolePolicy.

Create a custom policy using the JSON below and add it to the execution role.

# Fargate Execution role SSM policy 
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "SSMAccessForTokens",
            "Effect": "Allow",
            "Action": [
                "ssm:DescribeParameters",
                "ssm:GetParameter",
                "ssm:GetParameters"
            ],
            "Resource": [
                "arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/*-token"
            ]
        }
    ]
}

Note: The SSM policy uses a wildcard (*) in the token name to allow for flexible token use for this task.

Fargate Task Role

{
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "",
        "Effect": "Allow",
        "Principal": {
            "Service": "ecs-tasks.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
        }
    ]
}

This task role enables Fargate to access CloudWatch for logging, and the VPC.

In the IAM console, create an AWS Service role. Use the drop-down menu to select Elastic Container Service as the Use case, and select Elastic Container Service Task.

This role requires policies for four services: CloudWatch, VPC, S3, and SSM.

Create a policy with the following JSON for CloudWatch.

Fargate Task role CloudWatch policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowLogGroupAndStreamAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:DescribeLogStreams",
                "logs:PutLogEvents",
                "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:<AWS_REGION>:<AWS_ACCOUNT>:log-group:/ecs/fl-server-fargate:*"
            ]
        }
    ]
}

Fargate Task role VPC policy

The VPC policy enables the IAI server to retrieve public IPs for service discovery registration.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowVPC",
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": ["*"]
        }
    ]
}

Fargate Task role S3 policy

The Fargate Task role uses the S3 policy to store models in S3 after they have been trained.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3BucketAccess",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetEncryptionConfiguration",
                "s3:GetObjectAcl",
                "s3:ListBucketVersions",
                "s3:*Object"
            ],
            "Resource": [
                "arn:aws:s3:::<path to your data>",
                "arn:aws:s3:::<path to your data>/*"
            ]
        }
    ]
}

Fargate Task role SSM policy

The SSM policy allows access to required tokens.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "SSMAccessForTokens",
            "Effect": "Allow",
            "Action": [
                "ssm:DescribeParameters",
                "ssm:GetParameter",
                "ssm:GetParameters"
            ],
            "Resource": [
                "arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/*-token"
            ]
        }
    ]
}

Note: The SSM policy uses a wildcard (*) in the token name to allow for flexible token use for this task.

SDK User IAM Role

The SDK user requires permission to describe, start, and run an ECS Fargate task using the job definition provided. Any further permissions granted to the user depends on your application.

{
    "Statement": [
        {
            "Action" : [
                "ecs:DescribeContainerInstances",
                "ecs:DescribeTasks",
                "ecs:DescribeTaskDefinition",
                "ecs:ListTasks",
                "ecs:UpdateContainerAgent",
                "ecs:StartTask",
                "ecs:StopTask",
                "ecs:RunTask"
            ],
            "Effect" : "Allow",
            "Resource": ["arn:aws:ecs:<AWS_REGION>:<AWS_ACCOUNT>:cluster/iai-server-ecs-cluster",
            "arn:aws:ecs:<AWS_REGION>:<AWS_ACCOUNT>:task-definition/iai-server-fargate-job:*"]
        }, {
            "Sid" : "SSMAccessForTokens",
            "Effect" : "Allow",
            "Action" : [
                "ssm:DescribeParameters",
                "ssm:GetParameter",
                "ssm:PutParameter
            ],
            "Resource" : [
                "arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/*-token"
            ]
        }, {
          "Sid": "PassRole",
          "Effect": "Allow",
          "Action": [
            "iam:PassRole"
          ],
          "Resource": [
            <fargate_task_role.arn>,
            <fargate_execution_role.arn>,
          ]
        }
    ],
    "Version": "2012-10-17"
}

Role and policy configuration is now complete.

Continue on to set up the AWS Batch job.

Create an AWS Compute Environment

Create a Compute environment for the batch job. For detailed instructions, see the AWS documentation. Configurations specific to the integrate.ai environment are described here.

The name of this environment is a required parameter for the SDK.

We recommend the following defaults for the compute environment.

min_vcpus: 4
max_vcpus: 256
instance_type: ["c4.4xlarge", "m4.2xlarge", "r4.2xlarge"]
type: "EC2"

When using Batch with AWS Fargate, add egress to port 9999 from port 443.

Create an AWS Batch Job Queue

name: "<iai-client-batch-job-queue>"
state: "ENABLED"
priority: 1

Create a Job queue. The name of this queue is a required parameter for the SDK.

The queue must be associated with the compute environment that you created or configured earlier.

There are no additional roles or policies required for the queue.

Create a Batch job definition

{
    "image": "<AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_client:<version>",
    "vcpus": 1,
    "memory": 60000,
    "command": [
      "hfl",
      "Ref::task",
      "--token",
      "Ref::token",
      "--session-id",
      "Ref::sessionId",
      "--train-path",
      "Ref::trainPath",
      "--test-path",
      "Ref::testPath",
      "--batch-size",
      "Ref::batchSize",
      "--instruction-polling-time",
      "Ref::pollingTime",
      "--log-interval",
      "Ref::logInterval",
      "--approve-custom-package"
    ],
    "parameters": { "task": "train" },
    "jobRoleArn": "<ECS_JOB_ROLE_ARN>",
    "executionRoleArn": "<ECS_EXECUTION_ROLE_ARN>"
    "volumes": [],
    "mountPoints": [],
    "ulimits": [
      {
        "name": "nofile",
        "hardLimit": 10240,
        "softLimit": 10240
      }
    ],
    "secrets": [{
      "name": "IAI_TOKEN",
      "valueFrom": "<iai-token>"
    }],
    "resourceRequirements": []
}

Create a Batch Job definition. The name of this definition is a required parameter for the SDK.

In the definition, you must specify the following:

Set up the Fargate environment

You can configure an AWS Fargate environment through the console UI, or through tools such as Terraform. The required components are an ECS cluster, security and log groups, and a job definition.

ECS cluster

resource "aws_ecs_cluster" "iai_server_ecs_cluster" {
  name = "iai-server-ecs-cluster"

  tags = {
    Name = "IAI Server ECS Cluster"
  }
}

There are no integrate.ai-specific settings required for the ECS cluster. You can use the default configuration.

EC2 Security Group

resource "aws_security_group" "iai_server_security_group" {
  name        = "iai_server_security_group"
  vpc_id      = <VPC_ID>
  description = "Security group for IAI server ECS cluster"

  tags = {
    Name = "IAI Server Security Group"
  }

  ingress {
    description = "Allow to accept HTTP requests"
    from_port   = 9999
    to_port     = 9999
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    description = "Allow nodes to access external APIs"
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

}

The security group manages traffic for the Fargate server resources.

Cloudwatch Log Group

resource "aws_cloudwatch_log_group" "iai_server_fargate_log_group" {
  name = "iai-server-fargate-log-group"

  tags = {
    Name = "IAI Server Log Group"
  }
}

The log group captures and displays the output from task execution.

Fargate Job Definition

{
  "family": "iai-server-fargate-job",
  "type": "container",
  "network_mode": "awsvpc",
  "requires_compatibilities": ["FARGATE"],
  "cpu": 1024,
  "memory": 2048,
  #Specify the Fargate Execution role
  "execution_role_arn": "<ECS_EXECUTION_ROLE_ARN>",  
  #Specify the Fargate Task role
  "task_role_arn": "<ECS_TASK_ROLE_ARN>",  
  "runtime_platform": {
    "operating_system_family": "LINUX"
  },
  "placement_constraints": [],
  "container_definitions": [{
    "command": [],
    "cpu": 0,
    "disableNetworking": false,
    "entryPoint": [],
    "environment": [],
    "essential": true,
    #Specify the IAI server Docker image. Make sure you update this when the server version updates
    "image": "<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/iai_fl_server:<VERSION>",
    "logConfiguration": {
      "logDriver": "awslogs",
      "options": {
        #Specify the Fargate CloudWatch log group
        "awslogs-group": "<LOG_GROUP_ID>",
        "awslogs-region": "<AWS_REGION>",
        "awslogs-stream-prefix": "ecs"
      }
    },
    "mountPoints": [],
    "name": "iai-server-fargate",
    "portMappings": [
      {
        "containerPort": 9999,
        "hostPort": 9999,
        "protocol": "tcp"
      }
    ],
    "secrets": [{
        "name": "IAI_TOKEN",
        "valueFrom": "iai-token" 
    }],
    "volumesFrom": []
  }]
}

The job definition contains additional configuration.

This completes the configuration and setup for Fargate, Batch, and the integrate.ai components, as well as the roles, policies, and secrets required to run the server and client.

Running AWS Batch jobs with the SDK

AWS Batch provides convenient, scalable, machine learning computing that can be integrated with the SDK.

This tutorial assumes that you have configured the AWS Batch requirements, such as roles and permissions, as described in AWS Batch Manual Setup and that you have installed the SDK.

Use the integrateai_batch_client.ipynb notebook to follow along and test the examples shown below by filling in your own variables as required. You can download the notebook from the sample repo.

Federated learning models are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session. Additional session parameters are required when using AWS Batch.

Specify Batch parameters

# Training and test data paths

train_path1 = "s3://{path to training data}"
train_path2 = "s3://{path to training data}"
test_path = "s3://{path to test data}"

In addition to the session definition, there are AWS Batch-specific parameters required by the SDK to run a batch job.

Specify the path(s) to your training and test data on S3.

# Specify batch environment settings

job_queue='<aws batch job queue name>'
job_def='<aws batch job definition name>'
ssm_token='<name of iai token stored on SSM>'

Specify the name of the job_queue, job_definition, and ssm_token name that you created in Create an AWS Batch Job Queue.

AWS Authentication

# Example of setting temporary AWS credentials

aws_creds = {
    'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
    'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
    'SESSION_TOKEN': os.environ.get("AWS_SESSION_TOKEN"),
    'REGION': os.environ.get("AWS_REGION"),
}

If you are generating temporary AWS Credentials, specify them as in the example. Otherwise, use the default profile credentials, or pass in a Dict of AWS credential values.

Define a training session

# Create a training session

training_session = client.create_fl_session(
    name="Testing notebook",
    description="I am testing a batch job through a notebook",
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_ffnet",
    model_config=<model_config>,
    data_config=<data_schema>,
)

Prepare your model configuration and data schema. See Model Packages for information on the models available out-of-the-box in integrate.ai, or see Building a Custom Model for information on building your own model.

Define your training session as usual. The session definition is passed to the batch job through the task group that contains the tasks for the batch.

Specify the model_config and data_config names for the configuration and schema that you want to use.

Running Batch with a task group and taskbuilder

# Import the required functions.

from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup

Instead of running the integrate.ai client directly, import and use the taskgroup and taskbuilder functions. For each task, create a task object. Use the taskgroup to add each task to the batch.

One task is equivalent to one client in integrate.ai terms. The min_num_clients given in the training session definition must match the number of tasks defined in the batch.

# Create a taskbuilder object, and provide the required parameters.

tb = taskbuilder_aws.batch(ssm_token_parameter_key=ssm_token, 
    job_queue=<job_queue>,
    aws_credentials=<aws_creds>,
    cpu_job_definition=<job_def>)

Create a task group and start the batch

task_group_context = SessionTaskGroup(training_session)\
    .add_task(tb.hfl
        (train_path=train_path1, 
        test_path=test_path, 
        session=training_session, 
        vcpus='2', 
        memory='16384')
        )\
    .add_task(tb.hfl
        (train_path=train_path2, 
        test_path=test_path, 
        session=training_session, 
        vcpus='2', 
        memory='16384')
    ).start()

The task group defines the training_session and the tasks to run for the batch. The example code snippet creates the task group and starts the job.

The vcpu and memory parameters are optional. Use them to adjust the values in the job definition if necessary.

Monitor sumbitted Batch jobs

The task group context contains the session ID.

# Print the session ID

print(task_group_context.session_id)

# Monitor the status

task_group_status = task_group_context.status()
for task_status in task_group_status:
    print(task_status)

# Wait for session completion

task_group_conext.wait(300)

You can also review the status of the jobs in the AWS Console.

Running a training server on AWS Fargate

If this is your first time running a training session with a server in AWS Fargate, configure the Fargate environment, as described in AWS Batch and Fargate Manual Setup.

To follow along with this tutorial, open the integrateai_fargate_server.ipynb notebook.

Model config and data schema

# Example model configuration
model_config = {
    "experiment_name": "test_synthetic_tabular",
    "experiment_description": "test_synthetic_tabular",
    "strategy": {"name": "FedAvg", "params": {}},
    "model": {"params": {"input_size": 15, "hidden_layer_sizes": [6, 6, 6], "output_size": 2}},
    "balance_train_datasets": False,
    "ml_task": {
        "type": "classification",
        "params": {
            "loss_weights": None,
        },
    },
    "optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
    "differential_privacy_params": {"epsilon": 4, "max_grad_norm": 7},
    "save_best_model": {
        "metric": "loss",  # to disable this and save model from the last round, set to None
        "mode": "min",
    },
    "seed": 23,  # for reproducibility
}

Prepare your model configuration and data schema. See Model Packages for information on the models available out-of-the-box in integrate.ai, or see Building a Custom Model for information on building your own model. A generic example that matches the sample notebook is provided below.

# Example data schema 
data_schema = {
    "predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
    "target": "y",
}

Create a training session

training_session = client.create_fl_session(
    name="Fargate Testing notebook",
    description="I am testing session creation with Fargate through a notebook",
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_ffnet",
    model_config=model_config,  
    data_config=data_schema,
).start()

training_session.id # Print the training session ID for reference

This example session uses 2 clients and 2 rounds. The training_session definition is passed to the server as part of the task definition.

Note: If you are using a custom model, ensure that you specify the correct model_config and data_schema names.

Specifying optional AWS Credentials

If you are generating temporary AWS credentials, specify them here. Otherwise use the default profile credentials.

# Example of specifying temporary AWS credentials 

aws_creds = {
    'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
    'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
    'SESSION_TOKEN': os.environ.get("AWS_SESSION_TOKEN"),
    'REGION': os.environ.get("AWS_REGION"),
}

Specify the Fargate Cluster, Task Definition Name and Network Parameters

# Specify the name of your cluster, task definition and network parameters

fargate_cluster = '<fargate_cluster_name>'
task_def = '<fargate_task_definition>'
subnet_id = '<vpc_network_subnet_id>'
security_group = '<iai_server_security_group>'
train_path1 = '{train path 1}'
train_path2 = '{train path 2}'
test_path = '{test path}'
job_queue= '<fargate-job-queue>'
job_def='<iai-server-fargate-job>'
model_storage='{model storage path}'

Ensure that you have configured the cluster, task definition, and network parameters on AWS first, then specify them as variables for the SDK.

With the credentials and variables defined, you can now use the SDK to run the training server on AWS Fargate.

Run the training server

# Create a Fargate task builder object

from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws

tb = taskbuilder_aws.fargate(
    aws_credentials=<aws_creds>,
    cluster=<fargate_cluster>,
    task_definition=<task_def>)

The SDK provides a taskgroup and taskbuilder object to simplify the process of creating and managing Fargate and AWS Batch tasks.

# Create an AWS Batch task builder object

tb_batch = taskbuilder_aws.batch( 
    aws_credentials=<aws_creds>,
    job_queue=<job_queue>,
    cpu_job_definition=<job_def>)

Create a taskgroup

from integrate_ai_sdk.taskgroup.base import SessionTaskGroup

task_group_context = SessionTaskGroup(training_session)
    .add_task(tb.fls(subnet_id, security_group, ssm_token_key, storage_path=model_storage))
    .add_task(tb_batch.hfl(train_path=train_path1, test_path=test_path, vcpus='2', memory='16384'))
        .add_task(tb_batch.hfl(train_path=train_path2, test_path=test_path, vcpus='2', memory='16384')
        ).start()

The taskgroup starts the server and the batch.

Here we are creating a session task group that takes as input the training_session created earlier. The first task added (tb) starts the server. The tb_batch task is added twice - once for each client.

Tip: See Create a training session to review the session definition.

You can monitor the running server to check training progress.

task_group_context.wait(300)

REFERENCE

Setting up a Docker GPU Environment (Optional)

Set up a GPU environment if you are building a model with image or video data.

Prerequisites

Linux/MacOS Setup

  1. Install CUDA driver or CUDA toolkit:
  1. Install NVidia container toolkit:

Windows Setup

  1. Ensure that intel VT-x or AMD SVM is enabled in BIOS, check the motherboard manufacture document for exact steps.
  2. Install CUDA driver or CUDA toolkit:
  3. Install cuda toolkit (which include driver, but also contains other unnecessary components)
    • In the component selection screen, you can choose to install only the CUDA driver
  4. Install CUDA driver only
    • Go to Nvidia driver page, select the corresponding graphic card and operating system, download and install the driver.
  5. Setup Docker with WSL2.

Running Docker container with GPU device

Add --gpus all option to the docker run command.

Example: docker run --gpus all -it -d --name $SILO_NAME -v <data_path>:/root/demo 919740802015.dkr.ecr.ca-central-1.amazonaws.com/edge/fl-client:latest

Model Packages

Feed Forward Neural Nets (iai_ffnet)

Feedforward neural nets are a type of neural network in which information flows the nodes in a single direction.

Examples of use cases include:

HFL FFNet

The iai_ffnet model is a feedforward neural network for horizontal federated learning (HFL) that uses the same activation for each hidden layer.

This model only supports classification and regression. Custom loss functions are not supported.

Privacy

DP-SGD (differentially private stochastic gradient descent) is applied as an additional privacy-enhancing technology. The basic idea of this approach is to modify the gradients used in stochastic gradient descent (SGD), which lies at the core of almost all deep learning algorithms.

VFL SplitNN

integrate.ai also supports the SplitNN model for vertical federated learning (VFL). In this model, neural networks are trained with data across multiple clients. A PRL (private-record linking) session is required for all datasets involved. There are two types of sessions: train, and predict. To make predictions, the PRL session ID and the corresponding training session ID are required.

For more information, see PRL Session and VFL SplitNN Model Training.

Generalized Linear Models (GLMs)

This model class supports a variety of regression models. Examples include linear regression, logistic regression, Poisson regression, gamma regression and inverse Gaussian regression models. We also support regularizing the model coefficients with the elastic net penalty.

Examples of use cases include [1]:

The iai_glm model trains generalized linear models by treating them as a special case of single-layer neural nets with particular output activation functions.

Privacy

DP-SGD (differentially private stochastic gradient descent) is applied as an additional privacy-enhancing technology. The basic idea of this approach is to modify the gradients used in stochastic gradient descent (SGD), which lies at the core of almost all deep learning algorithms.

References [1]: https://scikit-learn.org/stable/modules/linear_model.html#generalized-linear-regression

Strategy Library

Reference guide for available training strategies.

FedAvg

Federated Averaging strategy implemented based on https://arxiv.org/abs/1602.05629

Parameters

FedAvgM

Federated Averaging with Momentum (FedAvgM) strategy https://arxiv.org/pdf/1909.06335.pdf

Parameters

Uses the same parameters as FedAvg as well as the following:

FedAdam

Adaptive Federated Optimization using Adam (FedAdam) https://arxiv.org/abs/2003.00295

Parameters

Uses the same parameters as FedAvg as well as the following:

FedAdagrad

Adaptive Federated Optimization using Adagrad (FedAdagrad) strategy https://arxiv.org/abs/2003.00295

Parameters

Uses the same parameters as FedAvg as well as the following:

FedOpt

Adaptive Federated Optimization (FedOpt) abstract strategy https://arxiv.org/abs/2003.00295

Parameters

Uses the same parameters as FedAvg as well as the following:

FedYogi

Federated learning strategy using Yogi on server-side https://arxiv.org/abs/2003.00295v5

Parameters

Evaluation metrics

Metrics are calculated for each round of training.

When the session is complete, you can see a set of metrics for all rounds of training, as well as metrics for the final model.

Retrieve Metrics for a Session

Use the SessionMetrics class of the API to store and retrieve metrics for a session. You can retrieve the model performance metrics as a dictionary (Dict), or plot them. See the API Class Reference for details.

Typical usage example:

client = connect("token") 

already_trained_session_id = "<sessionID>"

session = client.fl_session(already_trained_session_id)

# retrieve the metrics for the session as a dictionary
metrics = session.metrics.as_dict()
  1. Authenticate to and connect to the integrate.ai client.
  2. Provide the session ID that you want to retrieve the metrics for as the already_trained_session_id`.
  3. Call the SessionMetrics class.

Available Metrics

The Federated Loss value for the latest round of model training is reported as the global_model_federated_loss(float) attribute for an instance of SessionMetrics.

This is a model level metric reported for each round of training. It is a weighted average loss across different clients, weighted by the number of examples/samples from each silo.

See the metrics by machine learning task in the following table:

Classification and Logistic Regression and Normal Poisson, Gamma, Inverse Gaussian
Loss (cross-entropy) Loss (mean squared score) Loss (unit deviance)
ROC_AUC R2 score R2 score
Accuracy

Building a Custom Model

If the standard integrate.ai models (FFNet, GLM, GBM) do not suit your needs, you can create a custom model. If you are working with non-tabular data, you can also create a custom dataloader to use with your custom model.

integrate.ai supports all custom models under pytorch (for example CNNs, LSTMs, Transformers and GLMs).

Most supervised learning tasks with models that only include DP-compatible modules (including both natively compatible or can be converted to compatible substitute) are supported. * Some modules (for example, LSTM) are not compatible with DP-SGD out-of-the-box and must be replaced by their DP equivalent. * BatchNorm is not supported due to the nature of DP-SGD and must be replaced by alternative modules.

Customization is restricted in the following ways:

Download template and examples

Start by cloning the integrateai-samples repo. This repo contains everything you need to learn about and create your own custom model package. Review these examples and the API documentation before getting started.

The repo contains several folders, some of which are relevant to custom models:

Create a custom model package

Using the template files provided, create a custom model package.

Follow the naming convention for files in the custom package: no spaces, no special characters, no hyphens, all lowercase characters.

  1. Create a folder to contain your custom model package. For this tutorial, this folder is named myCustomModel, and is located in the same parent folder as the template folder.

Example path: C:<workspace>\integrate_ai_sdk\sample_packages\myCustomModel

  1. Create two files in the custom model package folder: a. model.py - the custom model definition. You can rename the template_model.py as a starting point for this file. b. <model-class-name>.json - default model inputs for this model. It must have the same name as the model class name that is defined in the model.py file.

If you are using the template files, the default name is templatemodel.json.

  1. Optional: To use a custom dataloader, you must also create a dataset.py and a dataset configuration JSON file in the same folder. For more information, see #.

If there is no custom dataset file, the default TabularDataset loader is used. It loads .parquet and .csv files, and requires predictors: ["x1", "x2"], target: y as input for the data configuration. This is what is used for the standard models.

The example below provides the boilerplate for your custom model definition. Fill in the code required to define your model. Refer to the model.py files provided for the lstmTagger and cifar10_vgg16 examples if needed.

from integrate_ai_sdk.base_class import IaiBaseModule

class TemplateModel(IaiBaseModule):
    def __init__(self):
        """
        Here you should instantiate your model layers based on the configs.
        """
        super(TemplateModel, self).__init__()

    def forward(self):
        """
        The forward path of a model. Can take an input tensor and return a prediction tensor
        """
        pass

if __name__ == "__main__":
    template_model = TemplateModel()

Custom model configuration inputs

Create a JSON file that defines the model inputs for your model.

It must have the same name as the model class name that is defined in the model.py file (e.g. templatemodel.json). The content of this file is dictated by your model.

Tip: File paths can be specified as S3 URLs.

The following parameters are required:

Parameter (type) Description
experiment_name (string) The name of your experiment.
experiment_description (string) A description of your experiment.
strategy (object) The federated learning strategy to use and any required parameters. Supported strategies are: FedAvg, FedAvgM, FedAdam, FedAdagrad, FedOpt, FedYogi. See for details.
model (object) The model type and any required parameters.
ml_task The machine learning task type and any required parameters. Supported types are: regression, classification, logistic, normal, Poisson, gamma, and inverseGaussian.
optimizer (object) The optimizer and any required parameters. See the https://pytorch.org/docs/stable/optim.html package description for details.
differential_privacy_params (number) The privacy budget. Larger values correspond to less privacy protection and potentially better model performance.

Example: FFNet model using a FedAvg strategy

{
    "experiment_name": "Test session",
    "experiment_description": "This is a test session",
    "job_type": "training",
    "strategy": {
        "name": "FedAvg",
        "params": {}
        },
    "model": {
        "type": "FFNet",
        "params": {
            "input_size": 200,
            "hidden_layer_sizes": [80,40,8],
            "output_size": 2,
            "hidden_activation": "relu"
            }
    },
    "ml_task": "classification",
    "optimizer": {
        "name": "SGD",
        "params": {
            "learning_rate": 0.03,
            "momentum": 0
        }
    },
    "differential_privacy_params": {
        "epsilon": 1,
        "max_grad_norm": 7,
        "delta": 0.000001
    },
    "eval_metrics": [
        "accuracy",
        "loss",
        "roc_auc"
    ]
}

Reference model validation schema

Below is the outline for the full schema used to validate the model configuration inputs for GLM and FFNet models. This schema is provided for reference.

{
  "$schema": "https://json-schema.org/draft-07/schema#",
  "title": "FL Model Config",
  "description": "The model config for an FL model",
  "type": "object",
  "properties": {
    "experiment_name": {
      "type": "string",
      "description": "Experiment Name"
    },
    "experiment_description": {
      "type": "string",
      "description": "Experiment Description"
    },
    "strategy": {
      "type": "object",
      "properties": {
        "name": {
          "enum": [
            "FedAvg"
          ],
          "description": "Name of the FL strategy"
        },
        "params": {
          "type": "object",
          "properties": {
            "fraction_fit": {
              "type": "number",
              "minimum": 0,
              "maximum": 1,
              "description": "Proportion of clients to use for training. If fraction * total_num_users is smaller than min_num_clients set in the session config, then min_num_clients will be used."
            },
            "fraction_eval": {
              "type": "number",
              "minimum": 0,
              "maximum": 1,
              "description": "Proportion of clients to use for evaluation. If fraction * total_num_users is smaller than min_num_clients set in the session config, then min_num_clients will be used."
            },
            "accept_failures": {
              "type": "boolean",
              "description": "Whether to accept failures during training and evaluation. If False, the training process will be stopped when a client fails, otherwise, the failed client will be ignored."
            }
          },
          "additionalProperties": false
        }
      },
      "required": [
        "name",
        "params"
      ]
    },
    "model": {
      "type": "object",
      "description": "Model type and parameters",
      "properties": {
        "params": {
          "type": "object",
          "description": "Model parameters"
        }
      },
      "required": [
        "params"
      ]
    },
    "ml_task": {
      "type": "object",
      "description": "Type of ML task",
      "properties": {
        "type": {
          "enum": [
            "regression",
            "classification",
            "logistic",
            "normal",
            "poisson",
            "gamma",
            "inverseGaussian"
          ]
        },
        "params": {
          "type": "object"
        }
      },
      "required": ["type", "params"],
      "allOf": [
        {
          "if": {
            "properties": { "type": { "enum": ["regression", "classification"] } }
          },
          "then": {
            "properties": { "params": { "type": "object" } }
          }
        },
        {
          "if": {
            "properties": { "type": { "enum": [
              "logistic",
              "normal",
              "poisson",
              "gamma",
              "inverseGaussian"
            ] } }
          },
          "then": {
            "properties": { "params": {
              "type": "object",
              "properties": {
                "alpha": {
                  "type": "number",
                  "minimum": 0
                },
                "l1_ratio": {
                  "type": "number",
                  "minimum": 0,
                  "maximum": 1
                }
              },
              "required": ["alpha", "l1_ratio"]
            } }
          }
        }
      ]
    },
    "optimizer": {
      "type": "object",
      "properties": {
        "name": {
          "enum": [
            "SGD"
          ]
        },
        "params": {
          "type": "object",
          "properties": {
            "learning_rate": {
              "type": "number",
              "minimum": 0,
              "description": "See https://pytorch.org/docs/stable/generated/torch.optim.SGD.html#torch.optim.SGD for details"
            },
            "momentum": {
              "type": "number",
              "minimum": 0,
              "description": "See https://pytorch.org/docs/stable/generated/torch.optim.SGD.html#torch.optim.SGD for details"
            }
          },
          "required": [
            "learning_rate"
          ],
          "additionalProperties": false
        }
      },
      "required": [
        "name",
        "params"
      ],
      "additionalProperties": false
    },
    "differential_privacy_params": {
      "type": "object",
      "properties": {
        "epsilon": {
          "type": "number",
          "minimum": 0,
          "description": "Privacy budget. Larger values correspond to less privacy protection, and potentially better model performance."
        },
        "max_grad_norm": {
          "type": "number",
          "minimum": 0,
          "description": "The upper bound for clipping gradients. A hyper-parameter to tune."
        }
      },
      "required": [
        "epsilon",
        "max_grad_norm"
      ]
    },
    "eval_metrics": {
      "description": "A list of metrics to use for evaluation",
      "type": "array",
      "minItems": 1,
      "items": {}
    }
  },
  "required": [
    "experiment_name",
    "experiment_description",
    "strategy",
    "model",
    "ml_task",
    "optimizer",
    "differential_privacy_params"
  ]
}

Test and upload the custom model

Before you start training your custom model, you should test it and upload it to your workspace. The method for uploading also tests the model by training a single epoch locally. After the model has been successfully uploaded, you or any user with access to the model can train it in a session.

To test and upload a custom model, use the upload_model method:

def upload_model(
    self,
    package_path: str,
    dataset_path: str,
    package_name: str,
    sample_model_config_path: str,
    sample_data_config_path: str,
    batch_size: int,
    task: str,
    test_only: bool,
    description: str
):

where:

Argument (type) Description
package_path (str) Path to your custom model folder
dataset_path (str) Path to the dataset(s)
package_name (str) Name of the custom model package. It must be unique from other previously uploaded package names.
sample_model_config_path (str) Path to the model configuration JSON file
sample_data_config_path (str) Path to the dataset configuration JSON file
batch_size (int) Number of samples to propagate through the network at a time
task (str) Either 'classification' or 'regression'. Set it to 'regression' for numeric and 'classification' for categorical target.
test_only (bool) If set to True, perform one epoch training to test the model without uploading it. If set to False, tests and uploads the model if the test passes.
description (str) Can be set with maximum 1024 characters to describe the model. This description also appears in the integrate.ai web portal.

This method tests a custom model by creating the model based on the custom model configuration (JSON file) and then training it with one epoch locally. If the model fails the test, it cannot be uploaded.

When starting a session with your custom model, make sure you specify the correct package_name, model_config, and data_config file names. For details, see create_fl_session in the API documentation.

User Authentication

Sharing access to training sessions and shared models in a simple and secure manner is a key requirement for many data custodians. integrate.ai provides a secure method of authenticating end users with limited permissions through the SDK to enable privileged access.

As the user responsible for managing access through the integrate.ai platform, you have the ability to generate an unscoped API token through the integrate.ai UI. Unscoped API tokens provide full access to the integrate.ai SDK. You can run client training tasks locally, or on remote data.

In the case that you want to create a token that has limited access, to enforce governance standards or provide an end user of your platform with limited access to the integrate.ai SDK, you can create scoped API tokens. Scoped tokens grant limited permissions, which enables you to control the level of access to trained sessions and models.

In the UI, you can view your personal API tokens as well as all scoped API tokens created in your organization's workspace through the SDK. These scoped user tokens are designed for use with the integrate.ai SDK. Tokens are tied to user identities through a unique ID, which is logged with each user action.

Limiting user access by token greatly reduces the security risk of leaked credentials. For example, with an unscoped API token, a user could run tasks on a remote machine, where there is a risk that it could be leaked or exposed because it is shared in an outside (non-local) environment. To mitigate that risk, you can instead provide the user with a scoped token that has limited permissions and a short lifespan (maximum 30 days).

Create an unscoped token

As the user who manages other users' access, you must first create your own unscoped token.

  1. Log in to your workspace through the portal.
  2. On the Dashboard, click Generate Access Token.
  3. Copy the acccess token and save it to a secure location.

Treat your API tokens like passwords and keep them secret. When working with the API, use the token as an environment variable instead of hardcoding it into your programs. In this documentation, the token is referenced as <IAI_TOKEN>.

Install components

Install the integrate.ai command-line tool (CLI), the SDK, and the client. For detailed instructions, see [##install-integrate-ai-sdk-and-client].

Open sample notebook

Open the Authentication sample notebook (integrateai_auth.ipynb) located in the SDK package.

...integrate_ai_sdk/src/integrate_ai_sdk/sample_packages/sample_notebook/

The notebook provides sample code that demonstrates how to use the SDK to generate users and tokens.

Create a user

from integrate_ai_sdk.auth import connect_to_auth_client
from integrate_ai_sdk.auth.scopes import Scope
import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")
auth_client = connect_to_auth_client(token=IAI_TOKEN)

Create a connection to the auth client with your unscoped token.

user_name = '{user_name}'
user = auth_client.create_user(user_name)

Create a user. Specify a user name (for example, demo-user, or user1@mycompany.com).

The SDK generates a unique ID for the user in the integrate.ai platform.

Example output:

01/27/2023 11:20:24:INFO:Machine user demo-user(f1bd9ff87c@integrate.ai) created by <your-email>. 01/27/2023 11:20:24:INFO:User activated.

Create a scoped token for the user

token = auth_client.create_token(user_id=user_name, scopes=[Scope.create_session, Scope.read_user_session])
print(token)p

Create a scoped token for the user. Include only the scopes that the user requires to work with the system and their data.

This request returns the unique user ID (the generated email), a list of the granted scopes, and the token, as well as the token ID and the user name.

Copy and save the token somewhere secure to share with the user.

Available Scopes

Verify user and token through the SDK

The token_info command allows you to inspect the details of a token. You must specify the token to inspect.

auth_client.token_info(token['token'])

Example output:

{'customer': 'your-environment-name', 'email': 'generated-email@integrate.ai', 'realm': 'ifl', 'role': 'admin', 'env': 'prod', 'token_id': '55a19b5d077d40a798aa51ace57168c3', 'iss': 'integrate.ai', 'iat': 1674832855, 'renewable': True, 'scope': 'create_session read_model read_session read_user_session', 'user_id': 'demo-user', 'user_type': 'generated', //Indicates whether the user was created through the SDK 'active': True}

Verify user and token through the UI

To confirm that the user and token were created successfully, you can also view them in the web dashboard.

  1. Log in to the web dashboard.
  2. Click Token Management.
  3. Click User Scoped Tokens.
  4. Locate the user name for the user you created.

Revoke a scoped token

User scoped tokens have a default lifespan of thirty (30) days. To revoke a token before it expires, use the revoke_token command in the SDK.

You must provide the token_id for the token that you want to revoke. You can find this ID in the web dashboard.

auth_client.revoke_token(token['token_id'])

Delete a user

Users that you create through the SDK can be deleted through the SDK.

Specify the name of the user that you want to delete.

auth_client.delete_user(user_name)

PRIVACY & SECURITY

Differential privacy

What is differential privacy?

Differential privacy is a technique for providing a provable measure of how “private” a data set can be. This is achieved by adding a certain amount of noise when responding to a query on the data. A balance needs to be struck between adding too much noise (making the computation less useful), and too little (reducing the privacy of the underlying data).

The technique introduces the concept of a privacy-loss parameter (typically represented by ε (epsilon)), which can be thought of as the amount of noise to add for each invocation of some computation on data. A related concept is the privacy budget, which can be chosen by the data curator.

This privacy budget represents the measurable amount of information exposed by the computation before the level of privacy is deemed unacceptable.

The benefit of this approach is that by providing a quantifiable measure, there can be a guarantee about how “private” the release of information is. However, in practice, relating the actual privacy to the computation in question can be difficult: i.e. how private is private enough? What will ε need to be? These are open questions in practice for practitioners when applying DP to an application.

How is Differential Privacy used in integrate.ai?

Users can add Differential Privacy to any model built in integrate.ai. DP parameters can be specified during session creation, within the model configuration.

When is the right time to use Differential Privacy?

Overall, differential privacy can be best applied when there is a clear ability to select the correct ε for the computation, and/or it is acceptable to specify a large enough privacy loss budget to satisfy the computation needs.

PRL Privacy for VFL

Private record linkage is a secure method for determining the overlap between datasets

In vertical federated learning (VFL), the datasets shared by any two parties must have some overlap and alignment in order to be used for machine learning tasks. There are typically two main problems:

  1. The identifiers between the two datasets are not fully overlapped.
  2. The rows of the filtered, overlapped records for the datasets are not in the same order.

To resolve these differences while maintaining privacy, integrate.ai applies private record linkage (PRL), which consists of two steps: determining the overlap (or intersection) and aligning the rows.

Private Set Intersection

First, Private Set Intersection (PSI) is used to determine the overlap without storing the raw data centrally, or exposing it to any party. PSI is a privacy-preserving technique that is considered a Secure multiparty computation (SMPC) technology. This type of technology uses cryptographic techniques to enable multiple parties to perform operations on disparate datasets without revealing the underlying data to any party.

Additionally, integrate.ai does not store the raw data on a server. Instead, the parties submit the paths to their datasets. integrate.ai uses the ID columns of the datasets to produce a hash locally that is sent to the server for comparison. The secret for this hash is shared only through Diffie–Hellman key exchange between the clients - the server has no knowledge of it.

Private Record Alignment

Once the ID columns are compared, the server knows which dataset indices are common between the two sets and can align the rows. This step is the private record alignment portion of PRL. It enables machine learning tasks to be performed on the datasets.

For more information about running PRL sessions, see PRL Sessions.

Release Notes

19 Dec 2023: PCA, VFL-GBM, and Dataset Registration

This release provides several new features:

Version: 9.6.6

28 Aug 2023: Session Credit Usage

This release provides users with the ability to see their credit usage in their workspace UI. Each training or analytic session uses a certain number of credits from the user's allotment. This usage can now be monitored through a graph, with daily details. Users can also request additional credit when needed.

Version: 9.6.2

14 Aug 2023: Azure Task Runners

This release expands the Control Plane system architecture to include Microsoft Azure Task Runners.

Task runners simplify the process of creating an environment to run federated learning tasks. They use the serverless capabilities of cloud environements, which greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.

For more information about task runners and control plane capabilities, see Using integrate.ai.

A tutorial for using Azure task runners is available here.

Version: 9.6.1

14 July 2023: AWS Task Runners

This release introduces the Control Plane system architecture and AWS Task Runners.

Task runners simplify the process of creating an environment to run federated learning tasks. They use the serverless capabilities of cloud environements (such as AWS Batch and Fargate), which greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.

For more information about task runners and control plane capabilities, see Using integrate.ai.

Version: 9.6.0

17 May 2023: 2D Histograms for EDA Intersect & Linear Inference

This release introduces two new features:

New in this release is also the addition of a single release version number to describe the release package. This release is version: 9.5.0.

27 April 2023: PRL, VFL, and EDA Intersect

This release introduces the following new features:

Note: this release does not support Python 3.11 due to a known issue in that Python release.

Versions:

30 Jan 2023: User Authentication

This release added two new features:

Bug fixes:

Versions:

08 Dec 2022: Integration with AWS Fargate

This release introduces the ability to run an IAI training server on AWS Fargate through the integrate.ai SDK. With an integrate.ai training server running on Fargate, your data in S3 buckets, and clients running on AWS Batch, you can use the SDK to manage and run fully remote training sessions. A guide is available here.

Versions:

02 Nov 2022: Integration with AWS Batch

This release introduces the ability to run AWS Batch jobs through the integrate.ai SDK. Building on the previous release, which added support for data hosted in S3 buckets, you can now start and run a remote training session on remote data with jobs managed by AWS Batch. A guide is available here.

Features:

Note: Docker must be running for the version command to return a value.

BREAKING CHANGE:

Session status mapping in the SDK has been updated as follows:

Bug fixes:

Versions:

06 Oct 2022: Exploratory Data Analysis & S3 Support

Features:

Versions:

14 Sept 2022: Infrastructure upgrades for session abstraction

API Documentation

The latest integrate.ai API documentation is always available online:

API Documentation