NAV
python

Welcome to integrate.ai

integrate.ai is the safe data collaboration platform that enables data science leaders to activate data silos by moving models, not data.

Unlock quality data at scale

Collaborate within and across organizations, jurisdictions, and environments without moving data.

Reduce collaboration effort

Efficiently collaborate without copying, centralizing, or transferring data directly.

Protect privacy & trust

Avoid compliance hurdles through a trusted platform with baked-in security, safety, and privacy

Developer tools for federated learning and analytics

Machine learning and analytics on distributed data require complex infrastructure and tooling that drain valuable developer resources. integrate.ai abstracts away the complexity, making it easier than ever to integrate federated learning and analytics into your product.

System Overview

integrate.ai provides a robust and seamless system that removes the infrastructure and administration challenges of distributed machine learning and enables you to integrate it directly into your existing workflow. With robust privacy-enhancing technologies like federated learning and differential privacy, integrate.ai allows you to train models and run analytics across distributed data without compromising data privacy.

Our platform consists of the following components:

  1. An SDK with a REST API that exposes a full suite of data science tools for privacy-safe analytics and AI across your intelligence network. It also supports notebook-based ad hoc discovery and exploratory analysis.
  2. A Task Server that automatically orchestrates federated analytics and AI tasks across connected data nodes.
  3. Task runners that apply governance controls, made data nodes discoverable, and execute analytics and AI tasks.

Deployment

integrate.ai provides a safe data collaboration platform that enables data science leaders to activate data silos by moving models, not data. It democratizes usage of the cloud by taking care of all the computational aspects of deployment — from container orchestration and deployment, to security and failovers. By harnessing the power of cloud computing and AI we enable data scientists and R&D professionals, with limited-to-no computational and machine learning training, to analyse and make sense of their data in the fastest and safest way possible.

The image below illustrates the high-level system architecture and how the managed environments interact.

The task runners use the serverless capabilities of cloud environments (such as AWS Batch and Fargate). This greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.

IT Administrator Workflow

The initial setup for your workspace must be performed by an AWS or Microsoft Azure Administrator. This Administrator must have the rights neceesary to configure the permissions and access needed by the cloud-based task runners.

  1. Follow the steps for either AWS configuration or Azure configuration to generate the roles and policies or service principal information.
  2. Provide the cloud environment details to the integrate.ai Workspace Administrator.

    a. For AWS, provide the provisioner and runtime role ARNs.

    b. For Azure, provide the Resource group, Service principal ID, Service principal secret, Subscription ID, and Tenant ID.

Alternatively, the IT Admin may choose to take on the Workspace Administrator role in the workspace and configure one or more task runners for their users.

Whitelisting task runner IPs

If your enterprise IT landscape requires ingress and egress exceptions for firewalls, open the necessary ports for the white-listed IPs required by the task runners.

Locate the task runner IPs

After you create a task runner, you can find the IP addresses and port on the task runner details page in the workspace. Workspace Administrators and Model Builders have permission to view these details.

  1. In your integrate.ai workspace, click Settings.
  2. Click Task Runners.
  3. Locate the task runner you need to whitelist in the table and click on that row.
  4. The task runner details page opens.

AWS configuration for task runners

Before you get started using integrate.ai in your cloud for training sessions, there are a few configuration steps that must be completed. You must grant integrate.ai permission to deploy task runner infrastructure in your cloud, by creating a limited permission Role in AWS for the provisioner and for the runtime agent. This is a one-time process - once created, you can use these roles for any task runners in your environment.

This section walks you through the required configuration.

  1. Create a provisioner role and policy.
  2. Create a runtime role and policy.

Create AWS IAM Provisioner Custom trust policy

# Provisioner Custom trust policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::919740802015:role/iai-taskrunner-provision-<workspace-name>-prod-batch-ecs-task-role"
                ]
            },
            "Action": [
                "sts:AssumeRole",
                "sts:TagSession"
            ],
            "Condition": {}
        }
    ]
}

You must grant integrate.ai access to deploy task runner infrastructure in your cloud, by creating a limited permission Role in AWS for the provisioner.

Create AWS Provisioner Role and Policy

This policy lists all of the required permissions for integrate.ai to create the necessary infrastructure.

The provisioner creates the following components and performs the required related tasks:

To create the provisioner role and policy:

  1. In the AWS Console, go to IAM, select Policies, and click Create policy.
  2. On the Specify permissions page, click the JSON tab.
  3. Paste in the Runtime JSON policy provided below by integrate.ai. Do not edit this policy.

    Provisioner JSON policy - do not edit. Click to expand.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "IAMPermissions", "Effect": "Allow", "Action": [ "iam:CreateInstanceProfile", "iam:CreateServiceLinkedRole", "iam:GetInstanceProfile", "iam:RemoveRoleFromInstanceProfile", "iam:DeleteInstanceProfile", "iam:AddRoleToInstanceProfile", "iam:CreatePolicy", "iam:CreateRole", "iam:GetPolicy", "iam:GetRole", "iam:GetPolicyVersion", "iam:ListRolePolicies", "iam:ListAttachedRolePolicies", "iam:ListPolicyVersions", "iam:ListInstanceProfilesForRole", "iam:DeletePolicy", "iam:DeleteRole", "iam:AttachRolePolicy", "iam:PassRole", "iam:DetachRolePolicy" ], "Resource": "*" }, { "Sid": "BatchPermissions", "Effect": "Allow", "Action": [ "batch:RegisterJobDefinition", "batch:DeregisterJobDefinition", "batch:CreateComputeEnvironment", "batch:UpdateComputeEnvironment", "batch:DeleteComputeEnvironment", "batch:UpdateJobQueue", "batch:CreateJobQueue", "batch:DeleteJobQueue", "batch:DescribeComputeEnvironments", "batch:DescribeJobDefinitions", "batch:DescribeJobQueues" ], "Resource": "*" }, { "Sid": "CloudWatchPermissions", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups", "logs:ListTagsLogGroup", "logs:CreateLogGroup", "logs:DescribeLogGroups", "logs:DeleteLogGroup", "logs:TagResource" ], "Resource": "*" }, { "Sid": "ECSFargatePermissions", "Effect": "Allow", "Action": [ "ecs:CreateCluster", "ecs:DescribeClusters", "ecs:DeleteCluster", "ecs:UpdateCluster", "ecs:RegisterTaskDefinition", "ecs:DescribeTaskDefinition", "ecs:DeregisterTaskDefinition" ], "Resource": "*" }, { "Sid": "KmsPermissions", "Effect": "Allow", "Action": [ "kms:ListAliases", "kms:CreateKey", "kms:CreateAlias", "kms:DescribeKey", "kms:GetKeyPolicy", "kms:GetKeyRotationStatus", "kms:ListResourceTags", "kms:ScheduleKeyDeletion", "kms:CreateGrant", "kms:ListGrants", "kms:RevokeGrant", "kms:DeleteAlias" ], "Resource": "*" }, { "Sid": "S3CreatePermissions", "Effect": "Allow", "Action": [ "s3:CreateBucket", "s3:DeleteBucket", "s3:DeleteBucketPolicy", "s3:PutBucketVersioning", "s3:PutBucketPublicAccessBlock", "s3:PutBucketVersioning", "s3:PutEncryptionConfiguration" ], "Resource": "*" }, { "Sid": "S3ReadPermissions", "Effect": "Allow", "Action": [ "s3:GetBucketCors", "s3:GetBucketPolicy", "s3:PutBucketPolicy", "s3:GetBucketWebsite", "s3:GetBucketVersioning", "s3:GetLifecycleConfiguration", "s3:GetAccelerateConfiguration", "s3:GetBucketRequestPayment", "s3:GetBucketLogging", "s3:GetBucketPublicAccessBlock", "s3:GetBucketAcl", "s3:GetBucketObjectLockConfiguration", "s3:GetReplicationConfiguration", "s3:GetBucketTagging", "s3:GetEncryptionConfiguration", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::*integrate.ai", "arn:aws:s3:::*integrate.ai/*" ] }, { "Sid": "VPCCreatePermissions", "Effect": "Allow", "Action": [ "ec2:CreateVpc", "ec2:CreateTags", "ec2:AllocateAddress", "ec2:ReleaseAddress", "ec2:CreateSubnet", "ec2:ModifySubnetAttribute", "ec2:RevokeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupEgress", "ec2:CreateRouteTable", "ec2:CreateRoute", "ec2:CreateInternetGateway", "ec2:AttachInternetGateway", "ec2:AssociateRouteTable", "ec2:ModifyVpcAttribute", "ec2:CreateSecurityGroup", "ec2:CreateNatGateway" ], "Resource": "*" }, { "Sid": "VPCDescribePermissions", "Effect": "Allow", "Action": [ "ec2:DescribeVpcs", "ec2:DescribeSubnets", "ec2:DescribeSubnets", "ec2:DescribeVpcAttribute", "ec2:DescribeVpcClassicLinkDnsSupport", "ec2:DescribeVpcClassicLink", "ec2:DescribeInternetGateways", "ec2:DescribeSecurityGroups", "ec2:DescribeSecurityGroupRules", "ec2:DescribeRouteTables", "ec2:DescribeNetworkAcls", "ec2:DescribeNetworkInterfaces", "ec2:DescribeNatGateways", "ec2:DescribeAddresses", "ec2:DescribeAccountAttributes" ], "Resource": "*" }, { "Sid": "VPCDeletePermissions", "Effect": "Allow", "Action": [ "ec2:DeleteSubnet", "ec2:DisassociateRouteTable", "ec2:DeleteSecurityGroup", "ec2:DeleteRoute", "ec2:DeleteNatGateway", "ec2:DeleteRouteTable", "ec2:DisassociateAddress", "ec2:DetachInternetGateway", "ec2:DeleteInternetGateway", "ec2:DeleteVpc" ], "Resource": "*" }, { "Sid": "NLBProxyPermissions", "Effect": "Allow", "Action": [ "elasticloadbalancing:DescribeLoadBalancers", "elasticloadbalancing:DescribeTags", "elasticloadbalancing:DescribeTargetGroups", "elasticloadbalancing:DescribeListeners", "elasticloadbalancing:CreateLoadBalancer", "elasticloadbalancing:CreateTargetGroup", "elasticloadbalancing:RegisterTargets", "elasticloadbalancing:SetIpAddressType", "elasticloadbalancing:SetSubnets", "elasticloadbalancing:AddTags", "elasticloadbalancing:DescribeLoadBalancerAttributes", "elasticloadbalancing:DeleteLoadBalancer", "elasticloadbalancing:ModifyLoadBalancerAttributes", "elasticloadbalancing:SetSecurityGroups", "elasticloadbalancing:ModifyTargetGroupAttributes", "elasticloadbalancing:DescribeTargetGroupAttributes", "elasticloadbalancing:DeleteTargetGroup", "elasticloadbalancing:DeleteListener", "elasticloadbalancing:CreateListener", "acm:DescribeCertificate", "acm:ImportCertificate", "acm:ListTagsForCertificate", "acm:DeleteCertificate", "ecs:CreateService", "ecs:DescribeServices", "ecs:UpdateService", "ecs:DeleteService", "ecs:TagResource" ], "Resource": "*" } ] }

  4. Click Next.

  5. Name the policy and click Create policy.

  6. In the left navigation bar, select Roles, and click Create role.

  7. Select Custom trust policy.

  8. Paste in the custom trust relationship JSON provided by integrate.ai (shown in the code panel on the right).

  9. Replace the workspace-name with the value for your workspace provided by integrate.ai.

  10. Click Next.

  11. On the Add permissions page, search for and select the policy you just created.

  12. Click Next.

  13. Provide the following Role name: iai-taskrunner-provisioner. Important: Do not edit or change this name.

  14. Click Create role.

Copy and save the ARN for the provisioner role. Provide the ARN to any Workspace Administrator or Model Builder who will be creating task runners.

Create AWS Runtime Role and Policy

You must grant integrate.ai access to run the task runner in your cloud environment by creating a limited permission role in AWS for the runtime agent.

Runtime custom trust policy

# Runtime Custom trust policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::919740802015:role/IAI-API-Instance-<workspace-name>-prod"
                ]
            },
            "Action": [
                "sts:AssumeRole",
                "sts:TagSession"
            ],
            "Condition": {}
        }
    ]
}

This policy requires fewer permissions, compared to the Provisioner role. For increased security, the runtime role has much less access. It is used to run training tasks.

To create the runtime policy and role:

  1. In the AWS Console, go to IAM, select Policies, and click Create policy.
  2. On the Specify permissions page, click the JSON tab.
  3. Paste in the Runtime JSON policy provided below by integrate.ai.

    Runtime JSON policy - do not edit. Click to expand.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "AllowBatchDescribeJobs", "Effect": "Allow", "Action": [ "batch:DescribeJobs", "batch:TagResource" ], "Resource": "*" }, { "Sid": "AllowBatchAccess", "Effect": "Allow", "Action": [ "batch:TerminateJob", "batch:SubmitJob", "batch:TagResource", "batch:CancelJob" ], "Resource": [ "arn:aws:batch:*:*:job-definition/iai-fl-client-batch-job-*", "arn:aws:batch:*:*:job-queue/iai-fl-client-batch-job-queue-*", "arn:aws:batch:*:*:job/*" ] }, { "Sid": "AllowECSUpdateAccess", "Effect": "Allow", "Action": [ "ecs:DescribeContainerInstances", "ecs:DescribeTasks", "ecs:ListTasks", "ecs:UpdateContainerAgent", "ecs:StartTask", "ecs:StopTask", "ecs:RunTask" ], "Resource": [ "arn:aws:ecs:*:*:cluster/iai-fl-server-ecs-cluster-*", "arn:aws:ecs:*:*:task/iai-fl-server-ecs-cluster-*", "arn:aws:ecs:*:*:task-definition/iai-fl-server-fargate-job-*" ] }, { "Sid": "AllowECSReadAccess", "Effect": "Allow", "Action": [ "ecs:DescribeTaskDefinition" ], "Resource" : ["*"] }, { "Sid": "AllowPassRole", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam::*:role/iai-fl-server-fargate-task-role-*-*", "arn:aws:iam::*:role/iai-fl-server-fargate-execution-role-*-*" ] }, { "Sid": "AllowSaveTokens", "Effect": "Allow", "Action": [ "ssm:PutParameter", "ssm:DescribeParameters", "ssm:GetParameters", "ssm:GetParameter" ], "Resource": [ "arn:aws:ssm:*:*:parameter/fl-server-*-token", "arn:aws:ssm:*:*:parameter/fl-client-*-token" ] }, { "Sid": "AllowS3Access", "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetObject", "s3:GetObjectAcl", "s3:GetObjectVersion", "s3:ListBucketVersions" ], "Resource": [ "arn:aws:s3:::*.integrate.ai", "arn:aws:s3:::*.integrate.ai/*" ] }, { "Sid": "AllowKMSUsage", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:Encrypt", "kms:GenerateDataKey" ], "Resource": "*", "Condition": { "ForAnyValue:StringLike": { "kms:ResourceAliases": "alias/iai/*" } } }, { "Sid": "AllowLogReading", "Effect": "Allow", "Action": [ "logs:Describe*", "logs:List*", "logs:StartQuery", "logs:StopQuery", "logs:TestMetricFilter", "logs:FilterLogEvents", "logs:Get*" ], "Resource": [ "arn:aws:logs:*:*:log-group:iai-fl-server-fargate-log-group-*:log-stream:ecs/fl-server-fargate/*", "arn:aws:logs:*:*:log-group:/aws/batch/job:log-stream:iai-fl-client-batch-job-*" ] } ] }

  4. Click Next.

  5. Name the policy and click Create policy.

  6. In the left navigation bar, select Roles, and click Create role.

  7. Select Custom trust policy.

  8. Paste in the custom trust relationship JSON provided by integrate.ai (shown in the code panel on the right).

  9. Replace the workspace-name with the value for your workspace provided by integrate.ai.

  10. Click Next.

  11. On the Add permissions page, search for and select the policy you just created.

  12. Click Next.

  13. Provide the following Role name: iai-taskrunner-runtime Important: Do not edit or change this name.

  14. Click Create role.

Copy and save the ARN for the runtime role. Provide the ARN to any Workspace Administrator or Model Builder who will be creating task runners.

Role and policy configuration is now complete.

Azure configuration for task runners

Before you get started using integrate.ai in your Microsoft Azure cloud for training sessions, there are a few configuration steps that must be completed. You must grant integrate.ai permission to deploy task runner infrastructure in your cloud, by creating a dedicated resource group and a limited permission service principal for provisioning a task runner. The provisioner automatically creates a second service principal to execute Azure tasks using the task runner. This is a one-time process - once created, you can use this infrastructure for any task runners in your environment.

This section walks you through the required configuration.

  1. Create a Resource Group and provisioner service principal.
  2. Create a task runner.
  3. Use the task runner in a notebook to perform training tasks.

Create an Azure Resource Group & Service Principal

# Example Minimum Permission Policy Requirements for the Provisioner Service Principal
{
    "properties": {
        "roleName": "iai-provisioner",
        "description": "Provision new task runner",
        "assignableScopes": ["/"],
        "permissions": [
            {
                "actions": [
                   "*/read",
                    "Microsoft.Authorization/roleAssignments/write",
                    "Microsoft.Storage/storageAccounts/write",
                    "Microsoft.Storage/storageAccounts/listKeys/action",
                    "Microsoft.Storage/storageAccounts/delete"
                ],
                "notActions": [],
                "dataActions": [],
                "notDataActions": []
            }
        ]
    }
}

You must grant integrate.ai access to deploy task runner infrastructure in your cloud, by creating a dedicated resource group and service principal for the provisioner agent. A dedicated resource group is required for integrate.ai to provision resources into. You must provide the credentials for the service principal as part of the task runner creation process.

In order to provide all the necessary permissions, the user who creates the resource group and provisioner service principal must be an Azure AD Administrator.

Internally, integrate.ai will create a second service principal with reduced permissions to run tasks within the previously created resource group.

Permission Requirements for Task Runner Provisioner

  1. If not already available, install the Azure CLI. For more information, see the Azure CLI documentation.
  2. At the command prompt, type: az ad sp create-for-rbac --role="Owner" --scopes="/subscriptions/<your subscription ID>/resourcegroups/<your-resource-group>" Make sure that you specify the correct resource group name.
  3. Copy the output and save it. This information is required to connect a new task runner. Note: The output includes credentials that you must protect.
  4. From the Azure AD dashboard, add the Application Administrator permission to the service principal.
# Example CLI output:

Creating 'Owner' role assignment under scope '/subscriptions/<subscription ID>/resourcegroups/test-resource-group'
The output includes credentials that you must protect. Be sure that you do not include these credentials in your code or check the credentials into your source control. For more information, see https://aka.ms/azadsp-cli

{
  "appId": "<Client ID>",
  "displayName": "azure-cli-2023-04-13-14-57-09",
  "password": "<secret>",
  "tenant": "<tenant ID>"
}

The provisioner creates the following components and performs the required related tasks:

USING INTEGRATE.AI

Workspace Administrator Workflow

Workspace Administrators have full control over the entire workspace, from adding and removing users and assigning roles, to controlling administrative and billing information. There must always be at least one user with this role to manage the workspace.

  1. Invite users to the workspace.
  2. Create an AWS task runner or Azure task runner for the data custodians and model builders to use to register datasets and perform model training.
  3. If the enterprise IT landscape requires ingress and egress exceptions for firewalls, provide the IP addresses from the task runners to the IT Administrator so that the necessary ports can be opened for the white-listed IPs. See Whitelisting task runner IPs for details.

Create an AWS Task Runner

Task runners simplify the process of running training sessions on your data.

Note: before attempting to create a task runner, ensure you have completed the AWS configuration for task runners.

To create an AWS task runner:

  1. Log in to your integrate.ai workspace.
  2. In the left navigation bar, Click Settings.
  3. Under Workspace, click Task Runners.
  4. Click Register to start registering a new task runner.
  5. Select the service provider - Amazon Web Services.
  6. Provide the following information:

    • Task runner name - must be unique
    • Provisioner role ARN - the ARN created by the IT Administrator in the Create AWS Provisioner Role and Policy setup procedure.
    • Runtime role ARN - the ARN created by the IT Administrator in the Create AWS Runtime Role and Policy setup procedure.
    • Region - select the AWS region to run in from the dropdown
    • Storage Path - if you have an existing AWS S3 bucket that contains the data you want to work with, provide the URL here in the s3:// format. Otherwise, the task runner will create a bucket for you to upload data into (e.g. s3://{aws_taskrunner_profile}-{aws_taskrunner_name}.integrate.ai).
    • vCPU - the number of virtual CPUs the task runner is allowed to use. The default is 4.
    • Memory size (MB) - the amount of memory to allocate to the task runners. The default is 8192MB.

  7. Click Save. Wait for the status to change to Online.

After successfully creating a task runner, you can use it to perform training tasks. You can reuse the task runner; there is no need to create a new one for each task.

Register a dataset (AWS)

For Exploratory Data Analysis (EDA) or Horizontal Federated Learning (HFL) tasks, register your dataset through the workspace UI, by following the steps below.

Support for VFL datasets will be added in a future release.

  1. Log in to your integrate.ai workspace.
  2. Click Library in the left navigation bar.
  3. On the Datasets tab, click Register dataset.
  4. Select a task runner to manage tasks related to your dataset. Note: If no task runners exist, ask your Workspace Administrator or a Model Builder to create one.
  5. Click Next.
  6. On the Dataset details and privacy controls page, type a name and description for the dataset.
  7. Specify the URI of the dataset, using the s3:// format.
  8. (Optional) If you have metadata to associate with the dataset, upload it in the Attachments section.
  9. To allow Custom Models to use your dataset, enable the Custom models toggle.
  10. Click Connect.

Your dataset is now registered and can be used in a notebook.

Use an AWS task runner and registered dataset in a notebook

eda_task_group_context = (
        SessionTaskGroup(eda_session) \
        // first task is for the server
        .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
        // client 1 uses a registered dataset
        .add_task(iai_tb_aws.eda(dataset_name="silo0_aws"))\  
        //client 2 uses a non-registered dataset
        .add_task(iai_tb_aws.eda(dataset_name="dataset_two", dataset_path=passive_train_path))\  
    .start()
    )

After you have registered your dataset with a task runner, you only need to specify the dataset name in the task to be done.

In this example code for creating a task group, the first task is running the server. The second task is starting a client that uses a registered dataset. The third task is starting a client with a non-registered dataset; in this case, you must specify both dataset name and path.

Sample AWS notebook and data

The integrateai_taskrunner_AWS.ipynb notebook provides examples of how to use an Azure task runner for different federated learning tasks.

You must install the integrate.ai SDK locally in order to run the notebook examples. For instructions, see Installing the SDK. Note that you do not need to install the integrate.ai Client or Server, as the task runner manages the client and server operations for you in your cloud environment.

Download the notebooks here, and the sample data here.

Create an Azure Task Runner

Task runners simplify the process of running training sessions on your data.

Note: before attempting to create a task runner, ensure you have completed the Azure configuration for task runners.

To create an Azure task runner:

  1. Log in to your integrate.ai workspace.
  2. In the left navigation bar, Click Settings.
  3. Under Workspace, click Task Runners.
  4. Click Register to start registering a new task runner.
  5. Select the service provider - Microsoft Azure.
  6. Provide the following information:

    • Task runner name - must be unique
    • Region - select from the list
    • Resource group - must be an existing dedicated resource group
    • Service principal ID - this is the appId from the Azure CLI output of creating a service principal.
    • Service principal secret - this is the password from the Azure CLI output
    • Subscription ID - the ID of your Microsoft Azure subscription. Can be found on the Azure dashboard.
    • Tenant ID - this is the tenantId from the Azure CLI output of creating a service principal.
    • vCPU - the number of virtual CPUs the task runner is allowed to use. The default is 4.
    • Memory size (MB) - the amount of memory to allocate to the task runners. The default is 8192MB.
    • Storage Path - if you have an existing Azure blob storage location that contains the data you want to work with, provide the URL here in the azure:// format. Otherwise, the task runner will create storage blob for you to upload data into (e.g. azure://<taskrunnername>sessionstorage).
    • Storage Account - specify the storage account name. By default this is <taskrunnername>storage.

  7. Click Save. Wait for the status to change to Online.

After successfully creating a task runner, you can use it to perform training tasks. You can reuse the task runner; there is no need to create a new one for each task.

Register a dataset (Azure)

For Exploratory Data Analysis (EDA) or Horizontal Federated Learning (HFL) tasks, register your dataset through the workspace UI, by following the steps below.

Support for VFL datasets will be added in a future release.

  1. Log in to your integrate.ai workspace.
  2. Click Library in the left navigation bar.
  3. On the Datasets tab, click Register dataset.
  4. Select a task runner to manage tasks related to your dataset. Note: If no task runners exist, ask your Workspace Administrator or a Model Builder to create one.
  5. Click Next.
  6. On the Dataset details and privacy controls page, type a name and description for the dataset.
  7. Specify the URI of the dataset, using the azure:// format.
  8. (Optional) If you have metadata to associate with the dataset, upload it in the Attachments section.
  9. To allow Custom Models to use your dataset, enable the Custom models toggle.
  10. Click Connect.

Your dataset is now registered and can be used in a notebook.

Use an Azure task runner and registered dataset in a notebook

eda_task_group_context = (
        SessionTaskGroup(eda_session) \
        // first task is for the server
        .add_task(iai_tb_azure.fls(storage_path=az_storage_path))\
        // client 1 uses a registered dataset
        .add_task(iai_tb_azure.eda(dataset_name="silo0_az"))\  
        //client 2 uses a non-registered dataset
        .add_task(iai_tb_azure.eda(dataset_name="dataset_two", dataset_path=passive_train_path))\  
    .start()
    )

After you have registered your dataset with a task runner, you only need to specify the dataset name in the task to be done.

In this example code for creating a task group, the first task is running the server. The second task is starting a client that uses a registered dataset. The third task is starting a client with a non-registered dataset; in this case, you must specify both dataset name and path.

Sample Azure notebook and data

The integrateai_taskrunner_Azure.ipynb notebook provides examples of how to use an Azure task runner for different federated learning tasks.

You must install the integrate.ai SDK locally in order to run the notebook examples. For instructions, see Installing the SDK. Note that you do not need to install the integrate.ai Client or Server, as the task runner manages the client and server operations for you in your cloud environment.

Download the notebooks here, and the sample data here.

Data Requirements

To run a training session, the data must be prepared according to the following standards:

integrate.ai supports horizontal federated learning (HFL) and vertical federated learning (VFL) to train models across different siloed datasets (or data silos) without transferring data between each silo, and supports tabular data for standard models.

You can create custom dataloaders for custom models, which allow for image and video data inputted as a folder instead a flat file. For more information about dataloaders, see Building a custom model. Custom models can also be used for feature engineering or preprocessing.

HFL data requirements

  1. Data should be in Apache Parquet format (recommended) or .csv, and can be hosted locally, in an S3 bucket, or in an Azure blob.
  2. For custom models, the data can be in either a single file OR folder format.
  3. Data must be fully feature engineered. Specifically, the data frame has to be ready to be used as an input for a modelling pipeline. This means that: a. All columns must be numerical b. Columns must not contain NULL values (missing values are imputed) c. Categorical variables are properly encoded (for example, by one-hot-encoding) d. Continuous variables are normalized to have mean = 0 and std = 1. This requirement is highly recommended for GLM and NN, but is not required for GBM.
  4. Feature engineering must be consistent across the silos. For example, if the datasets contain categorical values, such as postal codes, these values must be encoded the same way across all the datasets. For the postal code example, this means that the same postal code value translates to the same numerical values across all datasets that will participate in the training.
  5. Column names must be consistent across datasets. All column names (predictors and targets) must contain only alphanumeric characters (letters, numbers, dash -, underscore _) and start with a letter. You can select which columns you want to use in a specific training session.

VFL data requirements

  1. Data should be in Apache Parquet format (recommended) or .csv, and can be hosted locally, in an S3 bucket, or in an Azure blob.
  2. Data must be fully feature engineered. Specifically, the data frame has to be ready to be used as an input for a modelling pipeline. This means that: a. All columns must be numerical, with the exception of ID columns used for PRL. b. Columns must not contain NULL values (missing values are imputed) c. Categorical variables are properly encoded (for example, by one-hot-encoding) d. Continuous variables are normalized to have mean = 0 and std = 1. This requirement is highly recommended for GLM and NN, but is not required for GBM.

Installing the SDK

Pre-requisites

To run the integrate.ai SDK samples and build models, ensure that your environment is configured properly.

Required software:

Installing the SDK

Generate an access token

To install the SDK and other components locally, you must generate an access token through the workspace UI.

  1. Log in to your workspace.
  2. On the getting started page, click Generate.
  3. Copy the acccess token and save it to a secure location.

Treat your API tokens like passwords and keep them secret. When working with the API, use the token as an environment variable instead of hardcoding it into your programs. In this documentation, the token is referenced as <IAI_TOKEN>.

Install integrate.ai packages

pip install integrate-ai

iai sdk install --token <IAI_TOKEN>

Updating your SDK

Typically, each time integrate.ai releases a software update, you must update your local SDK installation. To check the SDK version, run iai sdk version.

You can compare the version against the feature release version in the Release Notes to ensure that your environment is up to date.

The command to update the SDK is the same as you used to install it initially.

iai sdk install --token <IAI_TOKEN>

For Developers

For those who are interested in integrating the system with their own product, or writing software that makes use of integrate.ai capabilities, there are several options, including a full-featured SDK and a RESTful API. Refer to the API Documentation for details of the functions and operations available through the SDK.

For local development, you can install the integrate.ai client and server using the same access token you used to install the SDK. The client and server are Docker container images. Alternatively, you can use task runners and only run the SDK locally.

iai client pull --token <IAI_TOKEN>
iai server pull --token <IAI_TOKEN>

Updating your local environment

When a new version of an integrate.ai package is released, follow these steps to update your local development environment.

Note: If you are using task runners, you only need to update your SDK.

Check version numbers

From a command prompt or terminal, use the pip show integrate-ai command to see the version of the integrate-ai command line interface (CLI).

To check the SDK version, run iai sdk version.

You can compare the version against the feature release version in the Release Notes to ensure that your environment is up to date.

Get the latest packages

Run the following command to update the CLI and all dependent packages. pip install -U integate-ai

Update the SDK: iai sdk install

Update the client: iai client pull

Update the server: iai server pull

HFL MODEL TRAINING

Linear Inference Sessions

An overview and example of a linear inference model for performing tasks such as GWAS in HFL.

The built-in model package iai_linear_inference trains a bundle of linear models for the target of interest against a specified list of predictors. It obtains the coefficients and variance estimates, and also calculates the p-values from the corresponding hypothesis tests. Linear inference is particularly useful for genome-wide association studies (GWAS), to identify genomic variants that are statistically associated with a risk for a disease or a particular trait.

This is a horizontal federated learning (HFL) model package.

The integrateai_taskrunner_AWS.ipynb, available in the integrate.ai sample repository, demonstrates this model package using the sample data that is available for download here.

Follow the instructions in the Installing the SDK section to prepare a local test environment for this tutorial.

Overview of the iai_linear_inference package

# Example model_config for a binary target

model_config_logit = {
    "strategy": {"name": "LogitRegInference", "params": {}},
    "seed": 23,  # for reproducibility
}

There are two strategies available in the package:

# Example data_config

data_config_logit = {
    "target": "y",
    "shared_predictors": ["x1", "x2"],
    "chunked_predictors": ["x0", "x3", "x10", "x11"]
}

The data_config dictionary should include the following three fields.

Note: The columns in all the fields can be specified as either names/strings or indices/integers.

With this example data configuration, the session trains four logistic regression models with y as the target, and x1, x2 plus any one of x0, x3, x10, x11 as predictors.

Create a linear inference training session

# Example training session

training_session_logit = client.create_fl_session(
    name="Testing linear inference session",
    description="I am testing linear inference session creation through a notebook",
    min_num_clients=2,
    num_rounds=5,
    package_name="iai_linear_inference",
    model_config=model_config_logit,
    data_config=data_config_logit,
).start()

training_session_logit.id

For this example, there are two (2) training clients and the model is trained over five (5) rounds.

Argument (type) Description
name (str) Name to set for the session
description (str) Description to set for the session
min_num_clients (int) Number of clients required to connect before the session can begin
num_rounds (str) Number of rounds of federated model training to perform
package_name (str) Name of the model package to be used in the session
model_config (dict) Contains the model configuration to be used for the session
data_config (dict) Contains the data configuration to be used for the session

Ensure that you have downloaded the sample data. If you saved it to a location other than your Downloads folder, specify the data_path to the correct location.

Expected path:

data_path = "~/Downloads/synthetic"

Start the linear inference training session

import subprocess

client_1 = subprocess.Popen(
    f"iai client train --token {IAI_TOKEN} --session {training_session_logit.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1-inference --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

client_2 = subprocess.Popen(
    f"iai client train --token {IAI_TOKEN} --session {training_session_logit.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2-inference --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

This example demonstrates starting a training session locally. The clients joining the session are named client-1-inference and client-2-inference, respectively.

Poll for linear inference session status

import time

current_round = None
current_status = None
while client_1.poll() is None or client_2.poll() is None:
    output1 = client_1.stdout.readline().decode("utf-8").strip()
    output2 = client_2.stdout.readline().decode("utf-8").strip()
    if output1:
        print("silo1: ", output1)
    if output2:
        print("silo2: ", output2)

    # poll for status and round
    if current_status != training_session_logit.status:
        print("Session status: ", training_session_logit.status)
        current_status = training_session_logit.status
    if current_round != training_session_logit.round and training_session_logit.round > 0:
        print("Session round: ", training_session_logit.round)
        current_round = training_session_logit.round
    time.sleep(1)

output1, error1 = client_1.communicate()
output2, error2 = client_2.communicate()

print(
    "client_1 finished with return code: %d\noutput: %s\n  %s"
    % (client_1.returncode, output1.decode("utf-8"), error1.decode("utf-8"))
)
print(
    "client_2 finished with return code: %d\noutput: %s\n  %s"
    % (client_2.returncode, output2.decode("utf-8"), error2.decode("utf-8"))
)

Use a polling function, such as the example provided, to wait for the session to complete.

View training metrics and model details

# Example output

{'session_id': '3cdf4be992',
 'federated_metrics': [{'loss': 0.6931747794151306},
  {'loss': 0.6766608953475952},
  {'loss': 0.6766080856323242},
  {'loss': 0.6766077876091003},
  {'loss': 0.6766077876091003}],
 'client_metrics': [{'user@integrate.ai:dedbb7e9be2046e3a49b28b0131c4b97': {'test_loss': 0.6931748060977674,
    'test_accuracy': 0.4995,
    'test_roc_auc': 0.5,
    'test_num_examples': 4000},
   'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_loss': 0.6931748060977674,
    'test_accuracy': 0.4995,
    'test_roc_auc': 0.5,
    'test_num_examples': 4000}},
  {'user@integrate.ai:dedbb7e9be2046e3a49b28b0131c4b97': {'test_num_examples': 4000,
    'test_loss': 0.6766608866775886,
    'test_roc_auc': 0.5996664746664747,
    'test_accuracy': 0.57625},
   'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_num_examples': 4000,
    'test_loss': 0.6766608866775886,
    'test_accuracy': 0.57625,
    'test_roc_auc': 0.5996664746664747}},
  {'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_loss': 0.6766080602706078,
    'test_accuracy': 0.5761875,
    'test_num_examples': 4000,
...
   'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_accuracy': 0.5761875,
    'test_roc_auc': 0.5996632246632246,
    'test_num_examples': 4000,
    'test_loss': 0.6766078165060236}}],
 'latest_global_model_federated_loss': 0.6766077876091003}

Once the session is complete, you can view the training metrics and model details such as the model coefficients and p-values. In this example, since there are a bundle of models being trained, the metrics are the average values of all the models.

training_session_logit.metrics().as_dict()

Plot the metrics

training_session_logit.metrics().plot()

Example output:

Retrieve the trained model

# Example of retrieving p-values

pv = model_logit.p_values()
pv
#Example p-value output:

x0      112.350396
x3      82.436540
x10     0.999893
x11     27.525280
dtype: float64

The LinearInferenceModel object can be retrieved using the model's as_pytorch method. The relevant information, such as p-values, can be accessed directly from the model object.

model_logit = training_session_logit.model().as_pytorch()

Retrieve the p-values

The .p_values() function returns the p-values of the chunked predictors.

Summary information

The .summary method fetches the coefficient, standard error, and p-value of the model corresponding to the specified predictor.

#Example of fetching summary

summary_x0 = model_logit.summary("x0")
summary_x0`

Example summary output:

Making predictions from a linear inference session

from torch.utils.data import DataLoader
from integrate_ai_sdk.packages.LinearInference.dataset import ChunkedTabularDataset

ds = ChunkedTabularDataset(path=f"{data_path}/test.parquet", **data_config_logit)
dl = DataLoader(ds, batch_size=len(ds), shuffle=False)
x, _ = next(iter(dl))
y_pred = model_logit(x)
y_pred

You can also make predictions with the resulting bundle of models when the data is loaded by the ChunkedTabularDataset from the iai_linear_inference package. The predictions will be of shape (n_samples, n_chunked_predictors) where each column corresponds to one model from the bundle.

#Example prediction output:

tensor([[0.3801, 0.3548, 0.4598, 0.4809],
        [0.4787, 0.3761, 0.4392, 0.3579],
        [0.5151, 0.3497, 0.4837, 0.5054],
        ...,
        [0.7062, 0.6533, 0.6516, 0.6717],
        [0.3114, 0.3322, 0.4257, 0.4461],
        [0.4358, 0.4912, 0.4897, 0.5110]], dtype=torch.float64)
```

Gradient Boosted Models (HFL-GBM)

Gradient boosting is a machine learning algorithm for building predictive models that helps minimize the bias error of the model. The gradient boosting model provided by integrate.ai is an HFL model that uses the sklearn implementation of HistGradientBoostingClassifier for classifier tasks and HistGradientBoostingRegresssor for regression tasks.

The GBM sample notebook (integrateai_api_gbm.ipynb) provides sample code for running the SDK, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.

Prerequisites

  1. Open the integrateai_api_gbm.ipynb notebook to test the code as you walk through this tutorial.
  2. Download the sample dataset to use with this tutorial. The sample files are: train_silo0.parquet - training data for dataset 1 train_silo1.parquet - training data for dataset 2 test.parquet - test data, to be used when joining a session

Review the sample Model Configuration

data_schema = {
    "predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
    "target": "y",
}

model_config = {
    "strategy": {
        "name": "HistGradientBoosting",
        "params": {}
    },
    "model": {
        "params": {
            "max_depth": 4, 
            "learning_rate": 0.05,
            "random_state": 23, 
            "max_bins": 128,
            "sketch_relative_accuracy": 0.001,
            "max_iter": 100,
        }
    },

    "ml_task": {"type": "classification", "params": {}}, 

    "save_best_model": {
        "metric": None,
        "mode": "min"
    },
  }

integrate.ai has a model class available for Gradient Boosted Models, called iai_gbm. This model is defined using a JSON configuration file during session creation.

The strategy for GBM is HistGradientBoosting.

You can adjust the following parameters as needed:

Set the machine learning task type to either classification or regression.

Specify any parameters associated with the task type in the params section.

The save_best_model option allows you to set the metric and mode for model saving. By default this is set to None, which saves the model from the previous round, and min.

The notebook also provides a sample data schema. For the purposes of testing GBM, use the sample schema as shown.

Create a GBM training session

training_session = client.create_fl_session(
    name="HFL session testing GBM",
    description="I am testing GBM session creation through a notebook",
    min_num_clients=2,
    num_rounds=10,
    package_name="iai_gbm",
    model_config=model_config,
    data_config=data_schema,
).start()

training_session.id

Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.

Create a session each time you want to train a new model.

The code sample demonstrates creating and starting a session with two training clients (two datasets) and ten rounds (or trees). It returns a session ID that you can use to track and reference your session.

Join the training session

import subprocess

data_path = "~/Downloads/synthetic"

client_1 = subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {training_session.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1 --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

client_2 = subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {training_session.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2 --remove-after-complete",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)

The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. The session begins training once the minimum number of clients have joined the session.

Poll for Session Results

Sessions may take some time to run depending on the computer environment. In the sample notebook and this tutorial, we poll the server to determine the session status.

View the training metrics

// Retrieve the model metrics

training_session.metrics().as_dict()

After the session completes successfully, you can view the training metrics and start making predictions. 1. Retrieve the model metrics as_dict. 2. Plot the metrics.

Example output:

{'session_id': '9fb054bc24',
 'federated_metrics': [{'loss': 0.6876291940808297},
  {'loss': 0.6825978879332543},
  {'loss': 0.6780059585869312},
  {'loss': 0.6737175708711147},
  {'loss': 0.6697578155398369},
  {'loss': 0.6658972384035587},
  {'loss': 0.6623568458259106},
  {'loss': 0.6589517279565335},
  {'loss': 0.6556690569519996},
  {'loss': 0.6526266353726387},
  {'loss': 0.6526266353726387}],
 'rounds': [{'user info': {'test_loss': 0.6817054933875072,
    'test_roc_auc': 0.6868823702288674,
    'test_accuracy': 0.7061688311688312,
    'test_num_examples': 8008},
   'user info': {'test_accuracy': 0.5720720720720721,
    'test_num_examples': 7992,
    'test_roc_auc': 0.6637941733389123,
    'test_loss': 0.6935647668830733}},
  {'user info': {'test_accuracy': 0.5754504504504504,
    'test_roc_auc': 0.6740578481919338,
    'test_num_examples': 7992,
    'test_loss': 0.6884922753070576},
   'user info': {'test_loss': 0.6767152831608197,
...
   'user info': {'test_loss': 0.6578156923815811,
    'test_num_examples': 7992,
    'test_roc_auc': 0.7210704078520924,
    'test_accuracy': 0.6552802802802803}}],
 'latest_global_model_federated_loss': 0.6526266353726387}

Plot the metrics

// Plot the metrics

fig = training_session.metrics().plot()

Example (image)

model = training_session.model().as_sklearn()
model

Example (image)

Load the test data

import pandas as pd

test_data = pd.read_parquet(f"{data_path}/test.parquet") test_data.head()

Example (image)

Convert test data to tensors

Y = test_data["y"]

X = test_data[["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"]]

Run model predictions

model.predict(X)

Result: array([0, 1, 0, ..., 0, 0, 1])

from sklearn.metrics import roc_auc_score
y_hat = model.predict_proba(X)
roc_auc_score(Y, y_hat[:, 1])

Result: 0.7082738332738332

Note: When the training sample sizes are small, this model is more likely to overfit the training data.

VFL MODEL TRAINING

Private Record Linkage (PRL) sessions

Private record linkage sessions create intersection and alignment among datasets to prepare them for vertical federated learning (VFL).

In a vertical federated learning process, two or more parties collaboratively train a model using datasets that share a set of overlapping features. These datasets generally each contain distinct data with some overlap. This overlap is used to define the intersection of the sets. Private record linkage (PRL) uses the intersection to create alignment between the sets so that a shared model can be trained.

Overlapping records are determined privately through a PRL session, which combines Private Set Intersection with Private Record Alignment.

For example, in data sharing between a hospital (party B, the Active party) and a medical imaging centre (party A, the Passive party), only a subset of the hospital patients will exist in the imaging centre's data. The hospital can run a PRL session to determine the target subset for model training.

PRL Session Overview

In PRL, two parties submit paths to their datasets so that they can be aligned to perform a machine learning task.

  1. ID columns (id_columns) are used to produce a hash that is sent to the server for comparison. The secret for this hash is shared between the clients and the server has no knowledge of it. This comparison is the Private Set Intersection (PSI) part of PRL.
  2. Once compared, the server orchestrates the data alignment because it knows which indices of each dataset are in common. This is the Private Record Alignment (PRA) part of PRL.

For information about privacy when performing PRL, see PRL Privacy for VFL.

PRL Session Example

Use the integrateai_batch_client_prl.ipynb notebook to follow along and test the examples shown below by filling in your own variables as required.

This example uses AWS Fargate and Batch to run the session.

  1. Complete the Environment Setup.
  2. Ensure that you have the correct roles and policies for Fargate and Batch. See Running AWS Batch jobs with the SDK and Running a training server on AWS Fargate for details.
  3. Authenticate to the integrate.ai API client.

Create a configuration for the PRL session.

# Example data config

prl_data_config = {
    "clients": {
        "passive_client": {"id_columns": ["id"]},
        "active_client": {"id_columns": ["id"],},
    }
}

Specify a prl_data_config that indicates the columns to use as identifiers when linking the datasets to each other. The number of items in the config specifies the number of expected clients. In this example, there are two items and therefore two clients submitting data. Their datasets are linked by the "id" column in any provided datasets.

Create the session

# Create session example

prl_session = client.create_prl_session(
    name="Testing notebook - PRL",
    description="I am testing PRL session creation through a notebook",
    data_config=prl_data_config
).start()

prl_session.id

To create the session, specify the data_config that contains the client names and columns to use as identifiers to link the datasets. For example: prl_data_config.

These client names are referenced for the compute on the PRL session and for any sessions that use the PRL session downstream.

Specify AWS parameters and credentials

# Example data paths in s3 
active_train_oath = 's3://<path to dataset>/active_train.csv'
passive_train_path = 's3://<path to dataset>/passive_train.csv'
active_test_path = 's3://<path to dataset>/active_test.csv'
passive_test_path = 's3://<path to dataset>/passive_test.csv'

# Specify the AWS parameters
cluster = "iai-server-ecs-cluster"
task_definition = "iai-server-fargate-job"
model_storage = "s3://<path to storage>"
security_group = "<security group name>"
subnet_id = "<subnet>" # Public subnet (routed via IGW)
job_queue='iai-client-batch-job-queue'
job_def='iai-client-batch-job'

# Example of using temporary credentials
aws_creds = {
    'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
    'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
    'REGION': os.environ.get("AWS_REGION"),
}
  1. Specify the paths to the datasets and the AWS Batch job information.
  2. Specify your AWS credentials if you are generating temporary ones. Otherwise, use the default profile credentials.

Create a task builder and task group

from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup

# Creating task builder objects

task_server = taskbuilder_aws.fargate(
    cluster=cluster,
    task_definition=task_definition)

tb = taskbuilder_aws.batch( 
    job_queue=job_queue,
    aws_credentials=aws_creds,
    cpu_job_definition=job_def)

task_group_context = SessionTaskGroup(prl_session)\
    .add_task(task_server.fls(subnet_id, security_group, storage_path=model_storage, client=<client>))\
    .add_task(tb.prl(train_path=passive_train_path, test_path=passive_test_path, vcpus='2', memory='16384', client=client, client_name="passive_client"))\
        .add_task(tb.prl(train_path=active_train_path, test_path=active_test_path, vcpus='2', memory='16384', client=client, client_name="active_client")).start()
  1. Set up the task builder and task group.
  2. Import the taskbuilder and taskgroup from the SDK.
  3. Specify the server and batch information to create the task builder objects.
  4. Create a task in the task group for each client. The number of tasks in the task group must match the number of clients specified in the data_config used to create the session.
  5. Set the train_path, test_path, and client_name for each task. The client_name must be the same name as specified in the data_config file.
  6. Optional: Set the vcpus and memory parameters are to override the Batch job definition.

Monitor submitted jobs

Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK. The following code returns the session ID that is included in the job name.

# session available in group context after submission
print(task_group_context.session.id)

Next, you can check the status of the tasks.

# status of tasks submitted
task_group_status = task_group_context.status()
for task_status in task_group_status:
    print(task_status)

Submitted tasks are in the pending state until the clients join and the session is started. Once started, the status changes to running.

# Use to monitor if a session has completed successfully or has failed
# You can modify the time to wait as per your specific task
task_group_context.wait(300)

View the overlap statistics

When the session is complete, you can view the overlap statistics for the datasets.

prl_session.metrics().as_dict()

Example result:

{'session_id': '07d0f8358d',
 'federated_metrics': [],
 'client_metrics': {'passive_client': {'train': {'n_records': 14400,
    'n_overlapped_records': 12963,
    'frac_overlapped': 0.9},
   'test': {'n_records': 3600,
    'n_overlapped_records': 3245,
    'frac_overlapped': 0.9}},
  'active_client': {'train': {'n_records': 14400,
    'n_overlapped_records': 12963,
    'frac_overlapped': 0.9},
   'test': {'n_records': 3600,
    'n_overlapped_records': 3245,
    'frac_overlapped': 0.9}}}}

To run a VFL training session on the linked datasets, see VFL FFNet Model Training).

To perform exploratory data analysis on the intersection, see EDA Intersect.

VFL SplitNN Model Training

In a vertical federated learning (VFL) process, two or more parties collaboratively train a model using datasets that share a set of overlapping features. Each party has partial information about the overlapped subjects in the dataset. Therefore, before running a VFL training session, a private record linkage (PRL) session is performed to find the intersection and create alignment between datasets.

There are two types of parties participating in the training:

For example, in data sharing between a hospital (party B, the Active party) and a medical imaging centre (party A, the Passive party), only a subset of the hospital patients will exist in the imaging centre's data. The hospital can run a PRL session to determine the target subset for VFL model training.

VFL Session Overview

A hospital may have patient blood tests and outcome information on cancer, but imaging data is owned by an imaging centre. They want to collaboratively train a model for cancer diagnosis based on the imaging data and blood test data. The hospital (active party) would own the outcome and patient blood tests and the Imaging Centre (passive party) would own the imaging data.

A simplified model of the process is shown below.

integrate.ai VFL Flow

The following diagram outlines the training flow in the integrate.ai implementation of VFL.

VFL Training Session Example

Use the integrateai_fargate_batch_client_vfl.ipynb notebook to follow along and test the examples shown below by filling in your own variables as required. You can download sample notebooks here.

The notebook demonstrates both the PRL session and the VFL train and predict sessions.

Note: This example uses AWS Fargate and Batch to run the session.

Complete the Environment Setup

model_config = {
    "strategy": {"name": "SplitNN", "params": {}},
    "model": {
        "feature_models": {
            "passive_client": {"params": {"input_size": 7, "hidden_layer_sizes": [6], "output_size": 5}},
            "active_client": {"params": {"input_size": 8, "hidden_layer_sizes": [6], "output_size": 5}},
        },
        "label_model": {"params": {"hidden_layer_sizes": [5], "output_size": 2}},
    },
    "ml_task": {
        "type": "classification",
        "params": {
            "loss_weights": None,
        },
    },
    "optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
    "seed": 23,  # for reproducibility
}
data_config = {
        "passive_client": {
            "label_client": False,
            "predictors": ["x1", "x3", "x5", "x7", "x9", "x11", "x13"],
            "target": None,
        },
        "active_client": {
            "label_client": True,
            "predictors": ["x0", "x2", "x4", "x6", "x8", "x10", "x12", "x14"],
            "target": "y",
        },
    }
  1. Ensure that you have run a PRL session to obtain the aligned dataset. The PRL session ID is required for the VFL training session.
  2. Create a model_config and a data_config for the VFL session.

Parameters:

Create and start a VFL training session

vfl_train_session = client.create_vfl_session(
    name="Testing notebook - VFL Train",
    description="I am testing VFL Train session creation through a notebook",
    prl_session_id=prl_session.id,
    vfl_mode='train',
    min_num_clients=2,
    num_rounds=5,
    package_name="iai_ffnet",
    data_config=data_config,
    model_config=model_config
).start()

vfl_train_session.id

Specify the PRL session ID and ensure that the vfl_mode is set to train.

Set up the task builder and task group for VFL training

vfl_task_group_context = (SessionTaskGroup(vfl_train_session)\
    .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
    .add_task(iai_tb_aws.vfl_train(train_path=active_train_path, 
                                    test_path=active_test_path, 
                                    batch_size=1024,  
                                    client_name="active_client", 
                                    storage_path = f"{aws_storage_path}/vfl/{vfl_train_session.id}"))\
    .add_task(iai_tb_aws.vfl_train(train_path=passive_train_path, 
                                    test_path=passive_test_path, 
                                    batch_size=1024, 
                                    client_name="passive_client", 
                                    storage_path = f"{aws_storage_path}/vfl/{vfl_train_session.id}"))\
    .start())

Create a task in the task group for the server, and for each client. The number of client tasks in the task group must match the number of clients specified in the data_config used to create the session.

The following parameters are required for each client task:

Monitor submitted VFL training jobs

# Check the status of the tasks
for i in vfl_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

vfl_task_group_context.monitor_task_logs()

# Wait for the tasks to complete (success = True)
vfl_task_group_context.wait(60*5, 2)

Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the notebook, as shown.

View the VFL training metrics

# Example results

{'session_id': '498beb7e6a',
 'federated_metrics': [{'loss': 0.6927943530912943},
  {'loss': 0.6925891094472265},
  {'loss': 0.6921983339753467},
  {'loss': 0.6920029462394067},
  {'loss': 0.6915351291650617}],
 'client_metrics': [{'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_roc_auc': 0.5286237121001411,
    'test_num_examples': 3245,
    'test_loss': 0.6927943530912943,
    'test_accuracy': 0.5010785824345146}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_num_examples': 3245,
    'test_accuracy': 0.537442218798151,
    'test_roc_auc': 0.5730010669487545,
    'test_loss': 0.6925891094472265}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_accuracy': 0.550693374422188,
    'test_roc_auc': 0.6073282812853845,
    'test_loss': 0.6921983339753467,
    'test_num_examples': 3245}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_loss': 0.6920029462394067,
    'test_roc_auc': 0.6330078151716465,
    'test_accuracy': 0.5106317411402157,
    'test_num_examples': 3245}},
  {'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_roc_auc': 0.6495852274713467,
    'test_loss': 0.6915351291650617,
    'test_accuracy': 0.5232665639445301,
    'test_num_examples': 3245}}]}

Once the session completes successfully, you can view the training metrics.

metrics = vfl_train_session.metrics().as_dict() metrics

Plot the VFL training metrics.

fig = vfl_train_session.metrics().plot()

Example of plotted training metrics

VFL Prediction Session Example

This example continues the workflow in the previous sections: PRL Session Example and VFL Training Session Example.

Create and start a VFL prediction session

# Example configuration of a VFL predict session

vfl_predict_session = client.create_vfl_session(
    name="Testing notebook - VFL Predict",
    description="I am testing VFL Predict session creation with an AWS task runner through a notebook",
    prl_session_id=prl_session.id,
    training_session_id=vfl_train_session.id,
    vfl_mode="predict",
    data_config=data_config
).start()

vfl_predict_session.id

To create a VFL prediction session, specify the PRL session ID (prl_session_id) and the VFL train session ID (training_session_id) from your previous succesful PRL and VFL sessions.

Set the vfl_mode to predict.

Specify the full path for the storage location for your predictions, including the file name.

#Where to store VFL predictions - must be full path and file name
vfl_predict_active_storage_path = f's3://{base_aws_bucket}/vfl_predict/active_predictions'
vfl_predict_passive_storage_path = f's3://{base_aws_bucket}/vfl_predict/passive_predictions'

Create and start a task group for the prediction session

vfl_predict_task_group_context = (SessionTaskGroup(vfl_predict_session)\
.add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
.add_task(iai_tb_aws.vfl_predict(
        client_name="active_client", 
        dataset_path=active_test_path, 
        batch_size=1024, 
        raw_output=False,
        storage_path = f"{vfl_predict_active_storage_path}_{vfl_predict_session.id}.csv"))\
.add_task(iai_tb_aws.vfl_predict(
        client_name="passive_client",
        dataset_path=passive_test_path,
        batch_size=1024,
        raw_output=False,
        storage_path = f"{vfl_predict_passive_storage_path}_{vfl_predict_session.id}.csv"))\
.start())

Create a task in the task group for the server, and for each client. The number of client tasks in the task group must match the number of clients specified in the data_config used to create the PRL and VFL train sessions.

The following parameters are required for each client task:

Monitor submitted VFL prediction jobs

# Example of monitoring tasks

for i in vfl_predict_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

vfl_predict_task_group_context.monitor_task_logs()

# Wait for the tasks to complete (success = True)

vfl_predict_task_group_context.wait(60*5, 2)

Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the notebook, as shown.

View VFL predictions

# Retrieve the metrics

metrics = vfl_predict_session.metrics().as_dict()
metrics

After the predict session completes successfully, you can view the predictions from the Active party and evaluate the performance.

presigned_result_urls = vfl_predict_session.prediction_result()

print(vfl_predict_active_storage_path)
df_pred = pd.read_csv(presigned_result_urls.get(f"{vfl_predict_active_storage_path}_{vfl_predict_session.id}.csv"))

df_pred.head()

Example output:

Gradient Boosted Models for VFL

Gradient boosting is a machine learning algorithm for building predictive models that helps minimize the bias error of the model. The gradient boosting model for VFL provided by integrate.ai uses the sklearn implementation of HistGradientBoostingClassifier for classifier tasks and HistGradientBoostingRegresssor for regression tasks.

The VFL-GBM sample notebook (integrateai_taskrunner_vfl_gbm.ipynb) provides sample code for running the SDK, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information.

Prerequisites

  1. Open the integrateai_taskrunner_vfl_gbm.ipynb notebook to test the code as you walk through this tutorial.
  2. Download the sample dataset to use with this tutorial. The sample files are: active_train.parquet - training data for the active party active_test.parquet - test data for the active party, used when joining a session passive_train.parquet - training data for the passive party passive_test.parquet - test data for the passive party, used when joining a session

Run a PRL session to align the datasets for VFL-GBM

Before you run a VFL session, you must first run a PRL session to align the datasets. The sample notebook provides two examples of running a PRL session with different match thresholds.

For more information, see Private Record Linkage (PRL) Sessions.

Review the sample Model Configuration for GBM

```model_config = { "strategy": {"name": "VflGbm", "params": {"hide_intersection": False}}, "model": { "params": { "max_depth": 4, "learning_rate": 0.05, "random_state": 23, "max_bins": 255, } }, "ml_task": {"type": 'classification', "params": {}}, }

data_config = { "passive_client": { "label_client": False, "predictors": ["x1", "x3", "x5", "x7", "x9", "x11", "x13"], "target": None, }, "active_client": { "label_client": True, "predictors": ["x0", "x2", "x4", "x6", "x8", "x10", "x12", "x14"], "target": "y", }, } ```

integrate.ai has a model class available for Gradient Boosted Models, called iai_gbm. This model is defined using a JSON configuration file during session creation.

The strategy for VFL-GBM is VflGbm. Note that this is different than the strategy name for an HFL GBM session, which is HistGradientBoosting.

You can adjust the following parameters as needed:

For more information, see the scikit documentation for HistGradientBoostingClassifier.

Set the machine learning task type to either classification or regression.

Specify any parameters associated with the task type in the params section.

The notebook also provides a sample data schema. For the purposes of testing VFL-GBM, use the sample schema as shown.

Create a VFL-GBM training session

vfl_train_session = client.create_vfl_session(
    name="Testing notebook - VFL-GBM training",
    description="I am testing VFL GBM training session creation through a notebook",
    prl_session_id=prl_session.id,
    vfl_mode="train",
    min_num_clients=2,
    num_rounds=5,
    package_name="iai_ffnet",
    data_config=data_config,
    model_config=model_config,
    startup_mode="external"
).start()

vfl_train_session.id 

Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.

Create a session each time you want to train a new model.

The code sample demonstrates creating and starting a session with two training clients (two datasets) and five rounds. It returns a session ID that you can use to track and reference your session.

Create and start a task group to run the training session

# Create and start a task group with one task for the server, and one for each of the clients joining the session

vfl_task_group_context = (SessionTaskGroup(vfl_train_session)\
    .add_task(iai_tb_aws.fls(storage_path=storage_path))\
    .add_task(iai_tb_aws.vfl_train(train_path=active_train_path, 
                                    test_path=active_test_path, 
                                    batch_size=1024,  
                                    client_name="active_client", 
                                    storage_path=aws_storage_path))\
    .add_task(iai_tb_aws.vfl_train(train_path=passive_train_path, 
                                    test_path=passive_test_path, 
                                    batch_size=1024, 
                                    client_name="passive_client", 
                                    storage_path=aws_storage_path))\
    .start())

The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. The session begins training once the minimum number of clients have joined the session.

Each client must have a unique name that matches the name specified in the data_config. For example, active_client and passive_client.

Poll for Session Results

# Check the status of the tasks

for i in vfl_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

vfl_task_group_context.monitor_task_logs()

vfl_task_group_context.wait(60*5, 2)

Sessions may take some time to run depending on the computer environment. In the sample notebook and this tutorial, we poll the server to determine the session status.

View the training metrics

metrics = vfl_train_session.metrics().as_dict()
metrics

fig = vfl_train_session.metrics().plot()

After the session completes successfully, you can view the training metrics and start making predictions. 1. Retrieve the model metrics as_dict. 2. Plot the metrics.

VFL-GBM Prediction

# Create and start a VFL predict session

vfl_predict_session = client.create_vfl_session(
    name="Testing notebook - VFL Predict",
    description="I am testing VFL Predict session creation with an AWS task runner through a notebook",
    prl_session_id=prl_session.id,
    training_session_id=vfl_train_session.id,
    vfl_mode="predict",
    data_config=data_config,
    startup_mode="external"
).start()

vfl_predict_session.id 

After you have completed a successful PRL and VFL train session, you can use those sessions to create a VFL prediction session.

For more information about VFL predict mode, see VFL Prediction Session Example.

Generalized Linear Models for VFL (VFL-GLM)

The VFL-GLM sample notebook (integrateai_taskrunner_vfl_glm.ipynb) provides sample code for running the SDK, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information.

Prerequisites

  1. Open the integrateai_taskrunner_AWS.ipynb notebook to test the code as you walk through this tutorial.
  2. Make sure you have downloaded the sample files for VFL: https://s3.ca-central-1.amazonaws.com/public.s3.integrate.ai/integrate_ai_examples/vfl.zip and uploaded them to your S3 bucket or Azure storage.
  3. Define the dataset paths as shown in the sample notebook.
aws_taskrunner_profile = "<myworkspace>" # This is your workspace name
aws_taskrunner_name = "<mytaskrunner>" # Task runner name - must match what was supplied in UI to create task runner

base_aws_bucket = f'{aws_taskrunner_profile}-{aws_taskrunner_name}.integrate.ai'

# Example datapaths. Make sure that the data you want to work with exists in the base_aws_bucket for your task runner.
# Sample PRL/VFL datapaths
active_train_path = f's3://{base_aws_bucket}/vfl/active_train.parquet'
active_test_path = f's3://{base_aws_bucket}/vfl/active_test.parquet'
passive_train_path = f's3://{base_aws_bucket}/vfl/passive_train.parquet'
passive_test_path = f's3://{base_aws_bucket}/vfl/passive_test.parquet'

#Where to store the trained model
aws_storage_path = f's3://{base_aws_bucket}/model'

#Where to store VFL predictions - must be full path and file name
vfl_predict_active_storage_path = f's3://{base_aws_bucket}/vfl_predict/active_predictions.csv'
vfl_predict_passive_storage_path = f's3://{base_aws_bucket}/vfl_predict/passive_predictions.csv'

Run a PRL session to align the datasets for VFL-GLM

Before you run a VFL session, you must first run a PRL session to align the datasets.

For more information, see Private Record Linkage (PRL) Sessions.

Review the sample Model Configuration for VFL-GLM

model_config = {
    "strategy": {"name": "VflGlm", "params": {}},
    "model": {
        "passive_client": {"params": {"input_size": 7, "output_activation": "sigmoid"}},
        "active_client": {"params": {"input_size": 8, "output_activation": "sigmoid"}},
    },
    "ml_task": {
        "type": "logistic",
        "params": {},
    },
    "optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
    "seed": 23,  # for reproducibility
}

data_config = {
        "passive_client": {
            "label_client": False,
            "predictors": ["x1", "x3", "x5", "x7", "x9", "x11", "x13"],
            "target": None,
        },
        "active_client": {
            "label_client": True,
            "predictors": ["x0", "x2", "x4", "x6", "x8", "x10", "x12", "x14"],
            "target": "y",
        },
    }

integrate.ai has a model class available for Generalized Linear Models, called iai_glm. This model is defined using a JSON configuration file during session creation (model_config).

You can adjust the following parameters as needed:

For more information, see the scikit documentation for Generalized Linear Models, or Generalized linear model on Wikipedia.

Set the machine learning task type to

Specify any parameters associated with the task type in the params section.

The notebook also provides a sample data schema. For the purposes of testing VFL-GLM, use the sample schema as shown.

Create a VFL-GLM training session

vfl_train_session = client.create_vfl_session(
    name="Testing notebook - VFL GLM Train",
    description="I am testing VFL GLM training session creation with a task runner through a notebook",
    prl_session_id=prl_session.id,
    vfl_mode='train',
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_glm",
    data_config=data_config,
    model_config=model_config
).start()

vfl_train_session.id

Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session. Create a session each time you want to train a new model.

The code sample demonstrates creating and starting a session with two training clients (two datasets) and two (2) rounds. It returns a session ID that you can use to track and reference your session.

Create and start a task group to run the training session

# Create and start a task group with one task for the server, and one for each of the clients joining the session

vfl_task_group_context = (SessionTaskGroup(vfl_train_session)\
    .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
    .add_task(iai_tb_aws.vfl_train(train_path=active_train_path, 
                                    test_path=active_test_path, 
                                    batch_size=1024,
client_name="active_client", storage_path=aws_storage_path))\ .add_task(iai_tb_aws.vfl_train(train_path=passive_train_path, test_path=passive_test_path, batch_size=1024, client_name="passive_client", storage_path=aws_storage_path))\ .start())

The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. The session begins training once the minimum number of clients have joined the session.

Each client must have a unique name that matches the name specified in the data_config. For example, active_client and passive_client.

Poll for Session Results

# Check the status of the tasks
for i in vfl_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))
vfl_task_group_context.monitor_task_logs()
# Wait for the session to complete
vfl_task_group_context.wait(60*8, 2)

Sessions may take some time to run depending on the computer environment. In the sample notebook and this tutorial, we poll the server to determine the session status.

View the training metrics

metrics = vfl_train_session.metrics().as_dict()
metrics
fig = vfl_train_session.metrics().plot()

After the session completes successfully, you can view the training metrics and start making predictions.

  1. Retrieve the model metrics as_dict.
  2. Plot the metrics.

VFL-GLM Prediction

# Create and start a VFL-GLM prediction session

vfl_predict_session = client.create_vfl_session( name="Testing notebook - VFL-GLM Predict", description="I am testing VFL-GLM prediction session creation with an AWS task runner through a notebook", prl_session_id=prl_session.id, training_session_id=vfl_train_session.id, vfl_mode="predict", data_config=data_config, ).start()

vfl_predict_session.id

# Create and start a task group with one task for the server, and one for each of the clients joining the session

vfl_predict_task_group_context = (SessionTaskGroup(vfl_predict_session)\ .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\ .add_task(iai_tb_aws.vfl_predict( client_name="active_client", dataset_path=active_test_path, raw_output=True, batch_size=1024, storage_path=vfl_predict_active_storage_path))\ .add_task(iai_tb_aws.vfl_predict( client_name="passive_client", dataset_path=passive_test_path, batch_size=1024, raw_output=True, storage_path=vfl_predict_passive_storage_path))\ .start())

After you have completed a successful PRL session and a VFL-GLM train session, you can use those two sessions to create a VFL-GLM prediction session.

In the example, the sessions are run in sequence, so the session IDs for the PRL and VFL train sessions are readily available to use in the predict session. If instead you run a PRL session and want to reuse the session later in a different VFL session, make sure that you save the session ID (prl_session.id). Then you can provide the session ID directly in the predict session setup instead of relying on the variable. The 3 sessions must use the same client_name and data in order to run successfully.

# Check the status of the tasks
for i in vfl_predict_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))
vfl_predict_task_group_context.monitor_task_logs()
# Wait for the tasks to complete (success = True)
vfl_predict_task_group_context.wait(60*5, 2)

View the training metrics

# Retrieve the metrics
metrics = vfl_predict_session.metrics().as_dict()
metrics
import pandas as pd
presigned_result_urls = vfl_predict_session.prediction_result()

df_pred = pd.read_csv(presigned_result_urls.get(vfl_predict_active_storage_path))

df_pred.head()

Now you can view the predictions and evaluate the performance.

For more information about VFL predict mode, see VFL Prediction Session Example.

DATA ANALYSIS

EDA in Individual Mode

The Exploratory Data Analysis (EDA) feature for horizontal federated learning (HFL) enables you to access summary statistics about a group of datasets without needing access to the data itself. This allows you to get a basic understanding of the dataset when you don't have access to the data or you are not allowed to do any computations on the data.

EDA is an important pre-step for federated modelling and a simple form of federated analytics. The feature has a built in differential privacy setting. Differential privacy (DP) is dynamically added to each histogram that is generated for each feature in a participating dataset. The added privacy protection causes slight noise in the end result.

The sample notebook (integrate_ai_api.ipynb) provides runnable code examples for exploring the API, including the EDA feature, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.

API Reference for EDA

The core API module that contains the EDA-specific functionality is integrate_ai_sdk.api.eda. This module includes a core object called EdaResults, which contains the results of an EDA session.

If you are comfortable working with the integrate.ai SDK and API, see the API Documentation for details.

This tutorial assumes that you have correctly configured your environment for working with integrate.ai, as described in Installing the SDK.

Configure an EDA Session

To begin exploratory data analysis, you must first create a session.

The dataset_config file is a configuration file that maps the name of one or more datasets to the columns to be pulled. Dataset names and column names are specified as key-value pairs in the file.

For each pair, the keys are dataset names that are expected for the EDA analysis. The values are a list of corresponding columns. The list of columns can be specified as column names (strings), column indices (integers), or a blank list to retrieve all columns from that particular dataset.

If a dataset name is not included in the configuration file, all columns from that dataset are used by default.

For example:

To retrieve all columns for a submitted dataset named dataset_one:

dataset_config = {"dataset_one": []}

To retrieve the first column and the column x2 for a submitted dataset named dataset_one:

dataset_config = {"dataset_one": [1,"x2"]}

To retrieve the first column and the column x2 for a submitted dataset named dataset_one and all columns in a dataset named dataset_two:

dataset_config = {"dataset_one": [1,"x2"],"dataset_two": []}

Specifying the number of datasets

You can manually set the number of datasets that are expected to be submitted for an EDA session by specifying a value for num_datasets.

If num_datasets is not specified, the number is inferred from the number of datasets provided in dataset_config.

Create and start an EDA session

dataset_config = {"dataset_one": [1,"x5","x7"], "dataset_two": ["x1","x10","x11"]}

eda_session = client.create_eda_session(
    name="Testing notebook - EDA",
    description="I am testing EDA session creation through a notebook",
    data_config=dataset_config
).start()

eda_session.id

The code sample demonstrates creating and starting an EDA session to perform privacy-preserving data analysis on two datasets, named dataset_one and dataset_two. It returns an EDA session ID that you can use to track and reference your session.

The dataset_config used here specifies that the first column (1), column x5, and column x7 will be analyzed for dataset_one and columns x1, x10, and x11 will be analyzed for dataset_two.

Since the num_datasets argument is not provided to client.create_eda_session(), the number of datasets is inferred as two from the dataset_config.

For more information, see the create_eda_session() definition in the API documentation.

Analyze the EDA results

The results object is a dataset collection comprised of multiple datasets that can be retrieved by name. Each dataset is comprised of columns that can be retrieved by either column name or by index.

You can perform the same base analysis functions at the collection, dataset, or column level.

results = eda_session.results()

*Example output:

EDA Session collection of datasets: ['dataset_two', 'dataset_one']

Describe

Use the .describe() method to retrieve a standard set of descriptive statistics.

If a statistical function is invalid for a column (for example, mean requires a continuous column and x10 is categorical), or the column from one dataset is not present in the other (for example, here x5 is in dataset_one, but not dataset_two), then the result is NaN.

results.describe()

results["dataset_one"].describe()

Statistics

For categorical columns, you can use other statistics for further exploration. For example, unique_count, mode, and uniques.

results["dataset_one"][["x10", "x11"]].uniques()

You can call functions such as .mean(), .median(), and .std() individually.

results["dataset_one"].mean()

results["dataset_one"]["x1"].mean()

Histograms

You can create histogram plots using the .plot_hist() function.

saved_dataset_one_hist_plots = results["dataset_two"].plot_hist()

single_hist = results["dataset_two"]["x1"].plot_hist()

EDA in Intersect Mode

The Exploratory Data Analysis (EDA) Intersect mode enables you to access summary statistics about the intersection of a group of datasets without needing access to the data itself. It is used primarily with VFL use cases to understand statistics about the overlapped records. For example, you may be interested in "what is the distribution of age among the intersection" (the intersection mode), which could be a very different answer from “what is the distribution of age among the whole population” (the individual mode).

With EDA Intersect, you can:

EDA is an important pre-step for federated modelling and a simple form of federated analytics. This feature has a built-in mechanism to achieve differential privacy. Proper noise is dynamically added to each histogram that is generated for each feature in a participating dataset. This adds extra protection on the raw data by making the final results differentially private, but at the same time it means the final results will deviate slightly from the true values.

API Reference

The core API module that contains the EDA-specific functionality is integrate_ai_sdk.api.eda. This module includes a core object called EdaResults, which contains the results of an EDA session. If you are comfortable working with the integrate.ai SDK and API, see the API Documentation for details.

Configure an EDA Intersect Session

Example data paths in S3

aws_taskrunner_profile = "<myworkspace>" # This is your workspace name
aws_taskrunner_name = "<mytaskrunner>" # Task runner name - must match what was supplied in UI to create task runner

base_aws_bucket = f'{aws_taskrunner_profile}-{aws_taskrunner_name}.integrate.ai'

active_train_path = f's3://{base_aws_bucket}/vfl/active_train.parquet'
active_test_path = f's3://{base_aws_bucket}/vfl/active_test.parquet'
passive_train_path = f's3://{base_aws_bucket}/vfl/passive_train.parquet'
passive_test_path = f's3://{base_aws_bucket}/vfl/passive_test.parquet'

#Where to store the trained model
aws_storage_path = f's3://{base_aws_bucket}/model'
eda_data_config = {"active_client":[],"passive_client":[]} 
prl_session_id = "<PRL_SESSION_ID>"

This example uses an AWS task runner to run the session using data in S3 buckets. Ensure that you have completed the AWS configuration for task runners and that a task runner exists. See Create an AWS task runner for details.

To begin exploratory data analysis, you must first create a session. To configure the session, specify the following:

The eda_data_config specifies the names of the datasets used to generate the intersection in the PRL session in the format dataset_name : columns. If columns is empty ([]), then EDA is performed on all columns.

You must also specify the session ID of a previous successful PRL session that used the same datasets listed in the eda_data_config.

Paired Columns

paired_cols = {"active_client": ['x0', 'x2'], "passive_client": ['x1']}

To find the correlation (or any other binary operation) between two specific columns, you must specify those columns as paired columns to ensure that the 2D histogram is calculated for them. Otherwise, only 1D histograms are generated for the columns separately, similar to the individual mode, except that they are constructed with only the overlapping samples from the intersection. If you are not interested in the bivariate operations, you can still run EDA in Intersect mode and leave the paired_coloumns empty.

To set which pairs you are interested in, specify their names in a dictionary like data_config.

For example: {"passive_client": ['x1', 'x5'], "active_client": ['x0', 'x2']}

will generate 2D histograms for these pairs of columns:

(x0, x1), (x0, x5), (x2, x1), (x2, x5), (x0, x2), (x1, x5)

Create an EDA intersect session

Import the task builder and task group from the SDK.

import integrate_ai_sdk
from integrate_ai_sdk.taskgroup.taskbuilder.integrate_ai import IntegrateAiTaskBuilder
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup
from integrate_ai_sdk.taskgroup.taskbuilder import taskrunner_context
from integrate_ai_sdk.taskgroup.taskbuilder.integrate_ai import IntegrateAiTaskBuilder
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup

iai_tb_aws = IntegrateAiTaskBuilder(client=client,
   task_runner_id=aws_taskrunner_name)

Create the EDA intersect sessions

eda_session = client.create_eda_session(
    name="EDA Intersect Session",
    description="Testing EDA Intersect mode through a notebook",
    data_config=eda_data_config,
    eda_mode="intersect",  #Generates histograms on an overlap of two distinct datasets
    prl_session_id=prl_session_id
    hide_intersection=True,   #Optional - specifies whether to apply CKKS encryption when generating the output. Defaults to true.
    paired_columns=paired_cols  #Optional - only required to generate 2D histograms
).start()

eda_session.id

The code sample demonstrates creating and starting an EDA session to perform privacy-preserving data analysis on the intersection of two distinct datasets. It returns an EDA session ID that you can use to track and reference your session.

Note that, unlike other sessions, you do not need to specify a model_config file for EDA. This is because there is only one editable parameter, hide_intersection, and it is passed into the session at creation instead of through a separate config file.

For more information, see the create_eda_session() definition in the API documentation.

Create and run the task group

# Create a task group with one task for the server, and one for each of the clients joining the session. If you have registered your dataset(s), specify only the `dataset_name` (no `dataset_path`). 

eda_task_group_context = (
        SessionTaskGroup(eda_session) \
        .add_task(iai_tb_aws.fls(storage_path=aws_storage_path))\
        .add_task(iai_tb_aws.eda(dataset_name="active_train" dataset_path="active_train_path"))\
        .add_task(iai_tb_aws.eda(dataset_name="passive_train", dataset_path=passive_train_path"))\
    )

eda_task_group_context = task_group.start()

Create a task in the task group for each client. The number of tasks in the task group must match the number of clients specified in the data_config used to create the session.

Monitor submitted EDA Intersect jobs

for i in eda_task_group_context.contexts.values():
    print(json.dumps(i.status(), indent=4))

eda_task_group_context.monitor_task_logs()

# Wait for the tasks to complete (success = True)

eda_task_group_context.wait(60*5, 2)

Submitted tasks are in the Pending state until the clients join and the session is started. Once started, the status changes to Running.

When the session completes successfully, "True" is returned. Otherwise, an error message appears.

Analyze the results

To retrieve the results of the session:

results = eda_session.results()

Example output:

EDA Session collection of datasets: ['active_client', 'passive_client']

Describe

You can use the .describe() function to review the results.

results.describe()

Example output:

Statistics

For categorical columns, you can use other statistics for further exploration. For example, unique_count, mode, and uniques.

results["active_client"][["x10", "x11"]].uniques()

Example output:

Mean

You can call functions such as .mean(), .median(), and .std() individually.

results["active_client"].mean()

Example output:

Histograms

You can create histogram plots using the .plot_hist() function.

saved_dataset_one_hist_plots = results["active_client"].plot_hist()

Example output:

single_hist = results["active_client"]["x10"].plot_hist()

Example output:

2D Histograms

You can also plot 2D-histograms of specified paired columns.

fig = results.plot_hist(active_client['x0'], passive_client['x1'])

Example output:

Correlation

You can perform binary calculations on columns specified in paired_columns, such as finding the correlation.

results.corr(active_client['x0'], passive_client['x1'])

Example output:

Addition, subtraction, division

Addition example. Change the operator to try subtraction, division, etc.

op_res = active_client['x0']+passive_client['x1'] fig = op_res.plot_hist()

Example output:

GroupBy

groupby_result = results.groupby(active_client['x0'])[passive_client['x5']].mean() print (groupby_result)

Principal Component Analysis (PCA)

Principal Component Analysis (PCA) is a dimensionality reduction technique that helps rule out features that are not important to the model. In other words, it generates a transformation to apply on a dataset to simplify the number of features used while retaining enough information in the features to represent the original dataset.

The main use of PCA is to help researchers uncover the internal pattern underlying the data. This pattern is then helpful mainly in two applications:

  1. Helps discover new groups and clusters among patients
  2. More efficient modelling: achieving the same or even better predictive power with less input and simpler models.

The iai_pca session outputs a transformation matrix that is generated in a distributed manner. This transformation can be applied to each dataset of interest.

The underlying technology of this session type is FedSVD (federated singular value decomposition), which can be used in other dimensionality reduction methods.

For a real-world example of using a PCA session, see the pca_face_decomposition.ipynb notebook available in the sample repository. Sample data for running this notebook is available in a zip file here.

Configure a PCA Session

model_config = {
    "experiment_name": "test_pca",
    "experiment_description": "test_pca",
    "strategy": {"name": "FedPCA", "params": {}},
    "model": {
        "params": {
            "n_features": 10,   # optional
            "n_components": 3,
            "whiten": False
        }
    },
}

data_config = {
    "predictors": []
}

This tutorial assumes that you have correctly configured your environment for working with integrate.ai, as described in Installing the SDK and Using integrate.ai. It uses locally run Docker containers for the client and server.

This example describes how to train an HFL session with the built-in package iai_pca, which performs the principal component analysis on the raw input data, and outputs the desired number of principal axes in the feature space along with the corresponding eigenvalues. The resulting model can be used to transform data points from the raw input space to the principal component space.

Sample model config and data config for PCA

In the model_config, use the strategy FedPCA with the iai_pca package.

There are three parameters that can be configured:

The example performs PCA on the 10 input features and outputs the first 3 principal axes.

Create and start a PCA training session

pca_session = client.create_fl_session(
    name="Testing PCA ",
    description="Testing a PCA session through a notebook",
    min_num_clients=2,
    num_rounds=2,
    package_name="iai_pca",
    model_config=model_config,
    data_config=data_config,
).start()

pca_session.id

Note that the num_rounds parameter is ignored when iai_pca is used, because PCA sessions determine the number of rounds required automatically.

Make sure that the sample data you downloaded is either saved to your ~/Downloads directory, or you update the data_path below to point to the sample data.

data_path = "~/Downloads/synthetic"

Create a task group for the server and the two clients.

from integrate_ai_sdk.taskgroup.taskbuilder import local
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup


tb = local.local_docker(
    client,
    docker_login=False,

)

task_group_context = (
    SessionTaskGroup(pca_session)
    .add_task(tb.fls(storage_path=storage_path, docker_options={"cpu_shares": 512}))\
    .add_task(tb.hfl(train_path=train_path1, test_path=test_path, client_name="client1"))\
    .add_task(tb.hfl(train_path=train_path2, test_path=test_path, client_name="client2"))\
    .start()
)

Wait for the session to complete successfully. This code will return True when it is complete.

task_group_context.wait(60 * 5)

View PCA session results

Note that the reverse process of mapping data points from the principal component space back to the original feature space can be treated as a multivariate regression task (i.e., reconstruct the raw features). We log some regression metrics (e.g., loss/mse and r2) for PCA sessions. They can be pulled as follows.

training_session.metrics().as_dict()

The PCA results can be retrieved as follows. It is a standard PyTorch model object, and the .state_dict() method can be used to show all stored parameters, including the principal axes matrix and eigenvalues.

pca_transformation = training_session.model().as_pytorch()
pca_transformation.state_dict()

We can also transform data points from the original feature space to the principal component space by directly calling the model object on the data tensors.

import pandas as pd

test_data = pd.read_parquet(f"{data_path}/test.parquet")
test_data.head()
import torch

test_data_tensor = torch.tensor(test_data[data_schema["predictors"]].values)
test_data_pc = pca_transformation(test_data_tensor)
test_data_pc.shape

DATA VALUATION

Data Consumers perform data evaluation with integrate.ai to understand how valuable a Data Provider's data product could be to their data science tasks or business problems.

There are two core questions Data Consumers typically seek to answer when evaluating a model or data product:

  1. How much data from a data product is usable in reference to my internal data?
  2. How relevant or useful is the data product to my data science task, use case, or project?

Our data valuation features help data scientists understand the value of a dataset as it relates to its contribution to a particular model.

integrate.ai provides features to address data valuation at two different levels:

  1. Dataset Influence Score that quantifies and ranks the influence each dataset has on a model.
  2. Feature Importance Score that quantifies and ranks features across contributing datasets, based on their influence on a global model.

We use the Shapley values from game theory and their related extensions to determine the scores.

Where they can be applied

The calculation of these scores is built into our vertical federated modeling sessions. Influence results are calculated during every round of the federated training session and are displayed with the results of the session to show how these scores trend over the training rounds. Feature importance is only evaluated once, at the end of the training.

The Data Influence Score and Feature Importance Score are applicable to regression/classification tasks for VFL. This applies to the following model types:

The integrateai_taskrunner_AWS.ipynb notebook contains an example of calculating and plotting the scores for both dataset influence and feature importance for a SplitNN VFL session. The example plots shown in this section are generated using this notebook and the sample datasets provided by integrate.ai.

Note: A completed successful PRL session (prl_session_id) with the appropriate overlap is required input for the VFL session. For more information about PRL sessions, see Private Record Linkage (PRL) sessions. For an overview of VFL SplitNN, see VFL SplitNN Model Training.

Dataset influence

Dataset influence measures the impact of an individual dataset (that is, a data product) on the performance of a machine learning model trained using the integrate.ai platform. By running dataset influence, a Data Consumer can understand to what extent a Data Provider's data product contributed to the performance of a machine learning model they trained. This feature can be used in conjunction with other evaluation jobs to help Data Consumers answer the question of data product relevance to their project.

This score is based on multi-round Shapley values. It can be shown as a percentage of overall influence of clients on the final model (shown in bar plot below) or as an absolute score (shown in line graph below). The absolute score can be plotted on a per round basis, to see how the influence stabilizes as training progresses.

Calculating Influence Scores

# Dataset influence scores are enabled by default.
    "influence_score": {
        "enable": True,
        "params": {
            "rel_thresh": 0.01,
            "max_eval_len": None
        }
    },

This feature is enabled by default.

To view influence scores, add the sample code shown to your VFL model_config.

Parameters:

Plotting Influence Scores

Add the following sample code to plot overall influence of clients on the final model.

vfl_train_influence_scores = vfl_train_session.influence_scores(all_metrics=False)
_ = vfl_train_influence_scores.plot()

This produces a bar plot.

Add the following sample code to plot the per round influence of clients on the final model.

_ = vfl_train_influence_scores.plot(per_round=True)

This produces a line graph.

Feature Importance

Data Consumers can use feature importance to understand how specific features in a Data Provider's data product impacted the performance of a machine learning model they trained. They can also compare features to the features in their own data. Note that if the data product consists of only one feature, we recommend using Dataset Influence instead of Feature Importance. Feature importance can be used in conjunction with other evaluation jobs to help Data Consumers answer the question of data product relevance to their project.

This score is based on the "SHapley Additive exPlanations" (SHAP) method and is single-round.

Use the SDK to plot this score as a beeswarm or a bar plot. The beeswarm plot represents the individual SHAP value for each data point in the feature. The bar plot shows the overall importance of a feature - the bigger the magnitude (absolute value), the more important the feature. The overall importance of a feature is the average of absolute SHAP value of the samples.

This feature is disabled by default.

Calculating Feature Importance Scores

# Feature importance scores are disabled by default. To enable, set "enable" to True.
    "feature_importance_score": {
        "enable": True,
        "params": {
            "max_evals": None,
            "subsample": None,
            "random_seed": 23,
        }
    },

To enable feature importance, add the sample code shown to your VFL model_config.

Note: Feature importance cannot be enabled if you have enabled hiding intersections (hide_intersection=True). When the intersection is hidden feature importance calculations are disabled regardless of how the feature option is set.

At a high level, feature importance is a type of sensitivity analysis. For one given real sample, we generate a set of “fake samples” by perturbing a subset of feature values, then check how the model output changes accordingly. The parameters determine the setting for this sensitivity analysis.

Parameters:

Plotting Feature Importance Scores

To display the calculated feature importance scores, add the following sample code.

vfl_feature_importance = vfl_train_session.feature_importance_scores()
vfl_feature_importance.as_dict()["feature_importance_scores"]

Example output:

To plot the importance scores for the top 15 features, add the following sample code.

_ = vfl_feature_importance.plot(topk=15)

This produces a bar plot.

To generate the SHAP value box plots, add the following sample code.

_ = vfl_feature_importance.plot(style="box", topk=15)

This produces two box plots.

To list the feature importance score details, add the following sample code.

vfl_feature_importance.as_dict()["feature_importance_details"][0]["active_client"]

Example output:

REFERENCE

Model Packages

Feed Forward Neural Nets (iai_ffnet)

Feedforward neural nets are a type of neural network in which information flows the nodes in a single direction.

Examples of use cases include:

HFL FFNet

The iai_ffnet model is a feedforward neural network for horizontal federated learning (HFL) that uses the same activation for each hidden layer.

This model only supports classification and regression. Custom loss functions are not supported.

Privacy

DP-SGD (differentially private stochastic gradient descent) is applied as an additional privacy-enhancing technology. The basic idea of this approach is to modify the gradients used in stochastic gradient descent (SGD), which lies at the core of almost all deep learning algorithms.

VFL SplitNN

integrate.ai also supports the SplitNN model for vertical federated learning (VFL). In this model, neural networks are trained with data across multiple clients. A PRL (private-record linking) session is required for all datasets involved. There are two types of sessions: train, and predict. To make predictions, the PRL session ID and the corresponding training session ID are required.

For more information, see PRL Session and VFL SplitNN Model Training.

Generalized Linear Models (GLMs)

This model class supports a variety of regression models. Examples include linear regression, logistic regression, Poisson regression, gamma regression and inverse Gaussian regression models. We also support regularizing the model coefficients with the elastic net penalty.

Examples of use cases include [1]:

The iai_glm model trains generalized linear models by treating them as a special case of single-layer neural nets with particular output activation functions.

Privacy

DP-SGD (differentially private stochastic gradient descent) is applied as an additional privacy-enhancing technology. The basic idea of this approach is to modify the gradients used in stochastic gradient descent (SGD), which lies at the core of almost all deep learning algorithms.

References [1]: https://scikit-learn.org/stable/modules/linear_model.html#generalized-linear-regression

Strategy Library

Reference guide for available training strategies.

FedAvg

Federated Averaging strategy implemented based on https://arxiv.org/abs/1602.05629

Parameters

FedAvgM

Federated Averaging with Momentum (FedAvgM) strategy https://arxiv.org/pdf/1909.06335.pdf

Parameters

Uses the same parameters as FedAvg as well as the following:

FedAdam

Adaptive Federated Optimization using Adam (FedAdam) https://arxiv.org/abs/2003.00295

Parameters

Uses the same parameters as FedAvg as well as the following:

FedAdagrad

Adaptive Federated Optimization using Adagrad (FedAdagrad) strategy https://arxiv.org/abs/2003.00295

Parameters

Uses the same parameters as FedAvg as well as the following:

FedOpt

Adaptive Federated Optimization (FedOpt) abstract strategy https://arxiv.org/abs/2003.00295

Parameters

Uses the same parameters as FedAvg as well as the following:

FedYogi

Federated learning strategy using Yogi on server-side https://arxiv.org/abs/2003.00295v5

Parameters

Evaluation metrics

Metrics are calculated for each round of training.

When the session is complete, you can see a set of metrics for all rounds of training, as well as metrics for the final model.

Retrieve Metrics for a Session

Use the SessionMetrics class of the API to store and retrieve metrics for a session. You can retrieve the model performance metrics as a dictionary (Dict), or plot them. See the API Class Reference for details.

Typical usage example:

client = connect("token") 

already_trained_session_id = "<sessionID>"

session = client.fl_session(already_trained_session_id)

# retrieve the metrics for the session as a dictionary
metrics = session.metrics.as_dict()
  1. Authenticate to and connect to the integrate.ai client.
  2. Provide the session ID that you want to retrieve the metrics for as the already_trained_session_id`.
  3. Call the SessionMetrics class.

Available Metrics

The Federated Loss value for the latest round of model training is reported as the global_model_federated_loss(float) attribute for an instance of SessionMetrics.

This is a model level metric reported for each round of training. It is a weighted average loss across different clients, weighted by the number of examples/samples from each silo.

See the metrics by machine learning task in the following table:

Classification and Logistic Regression and Normal Poisson, Gamma, Inverse Gaussian
Loss (cross-entropy) Loss (mean squared score) Loss (unit deviance)
ROC_AUC R2 score R2 score
Accuracy

Building a Custom Model

If the standard integrate.ai models (FFNet, GLM, GBM) do not suit your needs, you can create a custom model. If you are working with non-tabular data, you can also create a custom dataloader to use with your custom model.

integrate.ai supports all custom models under pytorch (for example CNNs, LSTMs, Transformers and GLMs).

Most supervised learning tasks with models that only include DP-compatible modules (including both natively compatible or can be converted to compatible substitute) are supported. * Some modules (for example, LSTM) are not compatible with DP-SGD out-of-the-box and must be replaced by their DP equivalent. * BatchNorm is not supported due to the nature of DP-SGD and must be replaced by alternative modules.

Customization is restricted in the following ways:

Download template and examples

Start by cloning the integrateai-samples repo. This repo contains everything you need to learn about and create your own custom model package. Review these examples and the API documentation before getting started.

The repo contains several folders, some of which are relevant to custom models:

Create a custom model package

Using the template files provided, create a custom model package.

Follow the naming convention for files in the custom package: no spaces, no special characters, no hyphens, all lowercase characters.

  1. Create a folder to contain your custom model package. For this tutorial, this folder is named myCustomModel, and is located in the same parent folder as the template folder.

Example path: C:<workspace>\integrate_ai_sdk\sample_packages\myCustomModel

  1. Create two files in the custom model package folder: a. model.py - the custom model definition. You can rename the template_model.py as a starting point for this file. b. <model-class-name>.json - default model inputs for this model. It must have the same name as the model class name that is defined in the model.py file.

If you are using the template files, the default name is templatemodel.json.

  1. Optional: To use a custom dataloader, you must also create a dataset.py and a dataset configuration JSON file in the same folder. For more information, see #.

If there is no custom dataset file, the default TabularDataset loader is used. It loads .parquet and .csv files, and requires predictors: ["x1", "x2"], target: y as input for the data configuration. This is what is used for the standard models.

The example below provides the boilerplate for your custom model definition. Fill in the code required to define your model. Refer to the model.py files provided for the lstmTagger and cifar10_vgg16 examples if needed.

from integrate_ai_sdk.base_class import IaiBaseModule

class TemplateModel(IaiBaseModule):
    def __init__(self):
        """
        Here you should instantiate your model layers based on the configs.
        """
        super(TemplateModel, self).__init__()

    def forward(self):
        """
        The forward path of a model. Can take an input tensor and return a prediction tensor
        """
        pass

if __name__ == "__main__":
    template_model = TemplateModel()

Custom model configuration inputs

Create a JSON file that defines the model inputs for your model.

It must have the same name as the model class name that is defined in the model.py file (e.g. templatemodel.json). The content of this file is dictated by your model.

Tip: File paths can be specified as S3 URLs.

The following parameters are required:

Parameter (type) Description
experiment_name (string) The name of your experiment.
experiment_description (string) A description of your experiment.
strategy (object) The federated learning strategy to use and any required parameters. Supported strategies are: FedAvg, FedAvgM, FedAdam, FedAdagrad, FedOpt, FedYogi. See for details.
model (object) The model type and any required parameters.
ml_task The machine learning task type and any required parameters. Supported types are: regression, classification, logistic, normal, Poisson, gamma, and inverseGaussian.
optimizer (object) The optimizer and any required parameters. See the https://pytorch.org/docs/stable/optim.html package description for details.
differential_privacy_params (number) The privacy budget. Larger values correspond to less privacy protection and potentially better model performance.

Example: FFNet model using a FedAvg strategy

{
    "experiment_name": "Test session",
    "experiment_description": "This is a test session",
    "job_type": "training",
    "strategy": {
        "name": "FedAvg",
        "params": {}
        },
    "model": {
        "type": "FFNet",
        "params": {
            "input_size": 200,
            "hidden_layer_sizes": [80,40,8],
            "output_size": 2,
            "hidden_activation": "relu"
            }
    },
    "ml_task": "classification",
    "optimizer": {
        "name": "SGD",
        "params": {
            "learning_rate": 0.03,
            "momentum": 0
        }
    },
    "differential_privacy_params": {
        "epsilon": 1,
        "max_grad_norm": 7,
        "delta": 0.000001
    },
    "eval_metrics": [
        "accuracy",
        "loss",
        "roc_auc"
    ]
}

Reference model validation schema

Below is the outline for the full schema used to validate the model configuration inputs for GLM and FFNet models. This schema is provided for reference.

{
  "$schema": "https://json-schema.org/draft-07/schema#",
  "title": "FL Model Config",
  "description": "The model config for an FL model",
  "type": "object",
  "properties": {
    "experiment_name": {
      "type": "string",
      "description": "Experiment Name"
    },
    "experiment_description": {
      "type": "string",
      "description": "Experiment Description"
    },
    "strategy": {
      "type": "object",
      "properties": {
        "name": {
          "enum": [
            "FedAvg"
          ],
          "description": "Name of the FL strategy"
        },
        "params": {
          "type": "object",
          "properties": {
            "fraction_fit": {
              "type": "number",
              "minimum": 0,
              "maximum": 1,
              "description": "Proportion of clients to use for training. If fraction * total_num_users is smaller than min_num_clients set in the session config, then min_num_clients will be used."
            },
            "fraction_eval": {
              "type": "number",
              "minimum": 0,
              "maximum": 1,
              "description": "Proportion of clients to use for evaluation. If fraction * total_num_users is smaller than min_num_clients set in the session config, then min_num_clients will be used."
            },
            "accept_failures": {
              "type": "boolean",
              "description": "Whether to accept failures during training and evaluation. If False, the training process will be stopped when a client fails, otherwise, the failed client will be ignored."
            }
          },
          "additionalProperties": false
        }
      },
      "required": [
        "name",
        "params"
      ]
    },
    "model": {
      "type": "object",
      "description": "Model type and parameters",
      "properties": {
        "params": {
          "type": "object",
          "description": "Model parameters"
        }
      },
      "required": [
        "params"
      ]
    },
    "ml_task": {
      "type": "object",
      "description": "Type of ML task",
      "properties": {
        "type": {
          "enum": [
            "regression",
            "classification",
            "logistic",
            "normal",
            "poisson",
            "gamma",
            "inverseGaussian"
          ]
        },
        "params": {
          "type": "object"
        }
      },
      "required": ["type", "params"],
      "allOf": [
        {
          "if": {
            "properties": { "type": { "enum": ["regression", "classification"] } }
          },
          "then": {
            "properties": { "params": { "type": "object" } }
          }
        },
        {
          "if": {
            "properties": { "type": { "enum": [
              "logistic",
              "normal",
              "poisson",
              "gamma",
              "inverseGaussian"
            ] } }
          },
          "then": {
            "properties": { "params": {
              "type": "object",
              "properties": {
                "alpha": {
                  "type": "number",
                  "minimum": 0
                },
                "l1_ratio": {
                  "type": "number",
                  "minimum": 0,
                  "maximum": 1
                }
              },
              "required": ["alpha", "l1_ratio"]
            } }
          }
        }
      ]
    },
    "optimizer": {
      "type": "object",
      "properties": {
        "name": {
          "enum": [
            "SGD"
          ]
        },
        "params": {
          "type": "object",
          "properties": {
            "learning_rate": {
              "type": "number",
              "minimum": 0,
              "description": "See https://pytorch.org/docs/stable/generated/torch.optim.SGD.html#torch.optim.SGD for details"
            },
            "momentum": {
              "type": "number",
              "minimum": 0,
              "description": "See https://pytorch.org/docs/stable/generated/torch.optim.SGD.html#torch.optim.SGD for details"
            }
          },
          "required": [
            "learning_rate"
          ],
          "additionalProperties": false
        }
      },
      "required": [
        "name",
        "params"
      ],
      "additionalProperties": false
    },
    "differential_privacy_params": {
      "type": "object",
      "properties": {
        "epsilon": {
          "type": "number",
          "minimum": 0,
          "description": "Privacy budget. Larger values correspond to less privacy protection, and potentially better model performance."
        },
        "max_grad_norm": {
          "type": "number",
          "minimum": 0,
          "description": "The upper bound for clipping gradients. A hyper-parameter to tune."
        }
      },
      "required": [
        "epsilon",
        "max_grad_norm"
      ]
    },
    "eval_metrics": {
      "description": "A list of metrics to use for evaluation",
      "type": "array",
      "minItems": 1,
      "items": {}
    }
  },
  "required": [
    "experiment_name",
    "experiment_description",
    "strategy",
    "model",
    "ml_task",
    "optimizer",
    "differential_privacy_params"
  ]
}

Test and upload the custom model

Before you start training your custom model, you should test it and upload it to your workspace. The method for uploading also tests the model by training a single epoch locally. After the model has been successfully uploaded, you or any user with access to the model can train it in a session.

To test and upload a custom model, use the upload_model method:

def upload_model(
    self,
    package_path: str,
    dataset_path: str,
    package_name: str,
    sample_model_config_path: str,
    sample_data_config_path: str,
    batch_size: int,
    task: str,
    test_only: bool,
    description: str
):

where:

Argument (type) Description
package_path (str) Path to your custom model folder
dataset_path (str) Path to the dataset(s)
package_name (str) Name of the custom model package. It must be unique from other previously uploaded package names.
sample_model_config_path (str) Path to the model configuration JSON file
sample_data_config_path (str) Path to the dataset configuration JSON file
batch_size (int) Number of samples to propagate through the network at a time
task (str) Either 'classification' or 'regression'. Set it to 'regression' for numeric and 'classification' for categorical target.
test_only (bool) If set to True, perform one epoch training to test the model without uploading it. If set to False, tests and uploads the model if the test passes.
description (str) Can be set with maximum 1024 characters to describe the model. This description also appears in the integrate.ai web portal.

This method tests a custom model by creating the model based on the custom model configuration (JSON file) and then training it with one epoch locally. If the model fails the test, it cannot be uploaded.

When starting a session with your custom model, make sure you specify the correct package_name, model_config, and data_config file names. For details, see create_fl_session in the API documentation.

User Authentication

Sharing access to training sessions and shared models in a simple and secure manner is a key requirement for many data custodians. integrate.ai provides a secure method of authenticating end users with limited permissions through the SDK to enable privileged access.

As the user responsible for managing access through the integrate.ai platform, you have the ability to generate an unscoped API token through the integrate.ai UI. Unscoped API tokens provide full access to the integrate.ai SDK. You can run client training tasks locally, or on remote data.

In the case that you want to create a token that has limited access, to enforce governance standards or provide an end user of your platform with limited access to the integrate.ai SDK, you can create scoped API tokens. Scoped tokens grant limited permissions, which enables you to control the level of access to trained sessions and models.

In the UI, you can view your personal API tokens as well as all scoped API tokens created in your organization's workspace through the SDK. These scoped user tokens are designed for use with the integrate.ai SDK. Tokens are tied to user identities through a unique ID, which is logged with each user action.

Limiting user access by token greatly reduces the security risk of leaked credentials. For example, with an unscoped API token, a user could run tasks on a remote machine, where there is a risk that it could be leaked or exposed because it is shared in an outside (non-local) environment. To mitigate that risk, you can instead provide the user with a scoped token that has limited permissions and a short lifespan (maximum 30 days).

Create an unscoped token

As the user who manages other users' access, you must first create your own unscoped token.

  1. Log in to your workspace through the portal.
  2. On the Dashboard, click Generate Access Token.
  3. Copy the acccess token and save it to a secure location.

Treat your API tokens like passwords and keep them secret. When working with the API, use the token as an environment variable instead of hardcoding it into your programs. In this documentation, the token is referenced as <IAI_TOKEN>.

Install components

Install the integrate.ai command-line tool (CLI), the SDK, and the client. For detailed instructions, see [##install-integrate-ai-sdk-and-client].

Open sample notebook

Open the Authentication sample notebook (integrateai_auth.ipynb) located in the SDK package.

...integrate_ai_sdk/src/integrate_ai_sdk/sample_packages/sample_notebook/

The notebook provides sample code that demonstrates how to use the SDK to generate users and tokens.

Create a user

from integrate_ai_sdk.auth import connect_to_auth_client
from integrate_ai_sdk.auth.scopes import Scope
import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")
auth_client = connect_to_auth_client(token=IAI_TOKEN)

Create a connection to the auth client with your unscoped token.

user_name = '{user_name}'
user = auth_client.create_user(user_name)

Create a user. Specify a user name (for example, demo-user, or user1@mycompany.com).

The SDK generates a unique ID for the user in the integrate.ai platform.

Example output:

01/27/2023 11:20:24:INFO:Machine user demo-user(f1bd9ff87c@integrate.ai) created by <your-email>. 01/27/2023 11:20:24:INFO:User activated.

Create a scoped token for the user

token = auth_client.create_token(user_id=user_name, scopes=[Scope.create_session, Scope.read_user_session])
print(token)p

Create a scoped token for the user. Include only the scopes that the user requires to work with the system and their data.

This request returns the unique user ID (the generated email), a list of the granted scopes, and the token, as well as the token ID and the user name.

Copy and save the token somewhere secure to share with the user.

Available Scopes

Verify user and token through the SDK

The token_info command allows you to inspect the details of a token. You must specify the token to inspect.

auth_client.token_info(token['token'])

Example output:

{'customer': 'your-environment-name', 'email': 'generated-email@integrate.ai', 'realm': 'ifl', 'role': 'admin', 'env': 'prod', 'token_id': '55a19b5d077d40a798aa51ace57168c3', 'iss': 'integrate.ai', 'iat': 1674832855, 'renewable': True, 'scope': 'create_session read_model read_session read_user_session', 'user_id': 'demo-user', 'user_type': 'generated', //Indicates whether the user was created through the SDK 'active': True}

Verify user and token through the UI

To confirm that the user and token were created successfully, you can also view them in the web dashboard.

  1. Log in to the web dashboard.
  2. Click Token Management.
  3. Click User Scoped Tokens.
  4. Locate the user name for the user you created.

Revoke a scoped token

User scoped tokens have a default lifespan of thirty (30) days. To revoke a token before it expires, use the revoke_token command in the SDK.

You must provide the token_id for the token that you want to revoke. You can find this ID in the web dashboard.

auth_client.revoke_token(token['token_id'])

Delete a user

Users that you create through the SDK can be deleted through the SDK.

Specify the name of the user that you want to delete.

auth_client.delete_user(user_name)

PRIVACY & SECURITY

Differential privacy

What is differential privacy?

Differential privacy is a technique for providing a provable measure of how “private” a data set can be. This is achieved by adding a certain amount of noise when responding to a query on the data. A balance needs to be struck between adding too much noise (making the computation less useful), and too little (reducing the privacy of the underlying data).

The technique introduces the concept of a privacy-loss parameter (typically represented by ε (epsilon)), which can be thought of as the amount of noise to add for each invocation of some computation on data. A related concept is the privacy budget, which can be chosen by the data curator.

This privacy budget represents the measurable amount of information exposed by the computation before the level of privacy is deemed unacceptable.

The benefit of this approach is that by providing a quantifiable measure, there can be a guarantee about how “private” the release of information is. However, in practice, relating the actual privacy to the computation in question can be difficult: i.e. how private is private enough? What will ε need to be? These are open questions in practice for practitioners when applying DP to an application.

How is Differential Privacy used in integrate.ai?

Users can add Differential Privacy to any model built in integrate.ai. DP parameters can be specified during session creation, within the model configuration.

When is the right time to use Differential Privacy?

Overall, differential privacy can be best applied when there is a clear ability to select the correct ε for the computation, and/or it is acceptable to specify a large enough privacy loss budget to satisfy the computation needs.

PRL Privacy for VFL

Private record linkage is a secure method for determining the overlap between datasets

In vertical federated learning (VFL), the datasets shared by any two parties must have some overlap and alignment in order to be used for machine learning tasks. There are typically two main problems:

  1. The identifiers between the two datasets are not fully overlapped.
  2. The rows of the filtered, overlapped records for the datasets are not in the same order.

To resolve these differences while maintaining privacy, integrate.ai applies private record linkage (PRL), which consists of two steps: determining the overlap (or intersection) and aligning the rows.

Private Set Intersection

First, Private Set Intersection (PSI) is used to determine the overlap without storing the raw data centrally, or exposing it to any party. PSI is a privacy-preserving technique that is considered a Secure multiparty computation (SMPC) technology. This type of technology uses cryptographic techniques to enable multiple parties to perform operations on disparate datasets without revealing the underlying data to any party.

Additionally, integrate.ai does not store the raw data on a server. Instead, the parties submit the paths to their datasets. integrate.ai uses the ID columns of the datasets to produce a hash locally that is sent to the server for comparison. The secret for this hash is shared only through Diffie–Hellman key exchange between the clients - the server has no knowledge of it.

Private Record Alignment

Once the ID columns are compared, the server knows which dataset indices are common between the two sets and can align the rows. This step is the private record alignment portion of PRL. It enables machine learning tasks to be performed on the datasets.

For more information about running PRL sessions, see PRL Sessions.

Release Notes

21 April 2024: Release 9.9.0

This release introduces the following new features:

Known issues

Fixes:

19 Mar 2024: Enterprise-Networking Ready Task Runners

This release introduces the following new features:

Known issues:

Fixes:

12 Feb 2024: Role-based Access Control (RBAC)

This release introduces the following new features:

  1. Role-based access control with three new built-in roles: * Administrator - responsible for all aspects of managing the workspace * Model builder - responsible for running sessions and analyzing results * Data custodian - responsible for registering and maintaining dataset listings.
  2. Added task level logs to the session details in the UI for ease of troubleshooting
  3. Added the version number of the release (and therefore the task runners/client/server) on the Session Details page
  4. Added example URLs in the UI to improve usability

Known issues:

Fixes:

19 Dec 2023: PCA, VFL-GBM, and Dataset Registration

This release provides several new features:

Version: 9.6.6

28 Aug 2023: Session Credit Usage

This release provides users with the ability to see their credit usage in their workspace UI. Each training or analytic session uses a certain number of credits from the user's allotment. This usage can now be monitored through a graph, with daily details. Users can also request additional credit when needed.

Version: 9.6.2

14 Aug 2023: Azure Task Runners

This release expands the Control Plane system architecture to include Microsoft Azure Task Runners.

Task runners simplify the process of creating an environment to run federated learning tasks. They use the serverless capabilities of cloud environements, which greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.

For more information about task runners and control plane capabilities, see Using integrate.ai.

A tutorial for using Azure task runners is available here.

Version: 9.6.1

14 July 2023: AWS Task Runners

This release introduces the Control Plane system architecture and AWS Task Runners.

Task runners simplify the process of creating an environment to run federated learning tasks. They use the serverless capabilities of cloud environements (such as AWS Batch and Fargate), which greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.

For more information about task runners and control plane capabilities, see Using integrate.ai.

Version: 9.6.0

17 May 2023: 2D Histograms for EDA Intersect & Linear Inference

This release introduces two new features:

New in this release is also the addition of a single release version number to describe the release package. This release is version: 9.5.0.

27 April 2023: PRL, VFL, and EDA Intersect

This release introduces the following new features:

Note: this release does not support Python 3.11 due to a known issue in that Python release.

Versions:

30 Jan 2023: User Authentication

This release added two new features:

Bug fixes:

Versions:

08 Dec 2022: Integration with AWS Fargate

This release introduces the ability to run an IAI training server on AWS Fargate through the integrate.ai SDK. With an integrate.ai training server running on Fargate, your data in S3 buckets, and clients running on AWS Batch, you can use the SDK to manage and run fully remote training sessions. A guide is available here.

Versions:

02 Nov 2022: Integration with AWS Batch

This release introduces the ability to run AWS Batch jobs through the integrate.ai SDK. Building on the previous release, which added support for data hosted in S3 buckets, you can now start and run a remote training session on remote data with jobs managed by AWS Batch. A guide is available here.

Features:

Note: Docker must be running for the version command to return a value.

BREAKING CHANGE:

Session status mapping in the SDK has been updated as follows:

Bug fixes:

Versions:

06 Oct 2022: Exploratory Data Analysis & S3 Support

Features:

Versions:

14 Sept 2022: Infrastructure upgrades for session abstraction

API Documentation

The latest integrate.ai API documentation is always available online:

API Documentation