Welcome to integrate.ai
Activate your data silos
We help organizations integrate AI to solve the world’s most important problems without risking the world’s most sensitive data. Safely collaborate on siloed data across organizations, jurisdictions, and environments to unlock mission critical analytics and AI use cases.
Unlock quality data at scale
Collaborate within and across organizations, jurisdictions, and environments without moving data
Reduce collaboration effort
Efficiently collaborate without copying, centralizing, or transferring data directly
Protect privacy & trust
Avoid compliance hurdles through a trusted platform with baked-in security, safety, and privacy
Developer tools for federated learning and analytics
Machine learning and analytics on distributed data require complex infrastructure and tooling that drain valuable developer resources. Integrate.ai abstracts away the complexity, making it easier than ever to integrate federated learning and analytics into your product.
Learn more about our flexible architecture options and expansive data science tooling.
System Overview
integrate.ai provides a robust and seamless system that removes the infrastructure and administration challenges of distributed machine learning and enables you to integrate it directly into your existing workflow. With robust privacy-enhancing technologies like federated learning and differential privacy, integrate.ai allows you to train models and run analytics across distributed data without compromising data privacy.
Our platform consists of the following components:
- The core IAI SaaS backend - includes federated learning aggregation capabilities, tenant management, control-plane system orchestration, and observability.
- A customer-facing web portal that supports general administration of the customer tenant.
- An SDK with a REST API that provides an interface into the centralized server. It also supports notebook-based ad hoc discovery and exploratory analysis.
Platform Architecture Overview
GETTING STARTED
Deployment Scenarios
The integrate.ai components can be deployed in different scenarios depending on your system needs.
For data scientists
integrate.ai offers a cloud-hosted, fully managed system for users focused on model building, training, and analysis. In this deployment scenario, the integrate.ai system manages all components hosted in the customer's environment using a limited permission role granted by the customer.
Create a task runner through the UI, and use it to perform the required training sessions without having to configure additional infrastructure.
For more information, see Using integrate.ai.
For developers
For those who are interested in integrating the system with their own product, or writing software that makes use of integrate.ai capabilities, there are several options, including a RESTful API and full-featured SDK.
For instructions on installing the SDK and necessary components for local development, see Installing the SDK.
Data Requirements
To run a training session, the data must be prepared according to the following standards:
- The data configuration dictated in the Session configuration
- The data requirements given below for running a model with integrate.ai
integrate.ai supports tabular data for standard models. You can create custom dataloaders for custom models, which allow for image and video data inputted as a folder instead a flat file. For more information about dataloaders, see Building a custom model.
Custom models can also be used for feature engineering or preprocessing.
GLM and FFNet Model Data Requirements
integrate.ai currently supports horizontal federated learning (HFL) and vertical federated learning (VFL) to train models across different siloed datasets (or data silos) without transferring data between each silo.
To train an HFL model, datasets within each data silo that will participate in an HFL training session need to be prepared according to the following requirements:
- Data should be in Apache Parquet format (recommended), .csv or S3 URLs (see below).
- For custom models, the data can be in either a single file OR folder format.
- Data must be fully feature engineered. Specifically, the data frame has to be ready to be used as an input for a neural network. This means that: a. All columns must be numerical b. Columns must not contain NULL values (missing values are imputed) c. Categorical variables are properly encoded (for example, by one-hot-encoding) d. Continuous variables are normalized to have mean = 0 and std = 1
- Feature engineering must be consistent across the silos. For example, if the datasets contain categorical values, such as postal codes, these values must be encoded the same way across all the datasets. For the postal code example, this means that the same postal code value translates to the same numerical values across all datasets that will participate in the training.
- Column names must be consistent across datasets. All column names (predictors and targets) must contain only alphanumeric characters (letters, numbers, dash -, underscore _) and start with a letter. You can select which columns you want to use in a specific training session.
Remote datasets
One of the key features of the integrate.ai platform is the ability to work with datasets without having to colocate them. The integrate.ai client and SDK are capable of working with datasets that are hosted remotely on AWS S3. You must set up and configure the AWS CLI to use S3 datasets.
Required AWS Credentials
aws_creds = {
'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
'SESSION_TOKEN': os.environ.get("AWS_SESSION_TOKEN"),
'REGION': os.environ.get("AWS_REGION"),
}
The following environment variables must be set for the iai client
to be able to read S3 data locations. You configure these variables as part of setting up the AWS CLI. If you are generating temporary AWS Credentials, specify them as in the example below. Otherwise, use the default profile credentials, or pass in a Dict of AWS credential values.
S3 buckets as data paths
import subprocess
data_path = "https://s3.<region>.amazonaws.com/pathtodata/"
dataset_1 = subprocess.Popen(f"iai client --token {IAI_TOKEN} --session {session.id} --dataset-path {data_path}/{filename} --dataset-name {dataset_one} --remove-after-complete",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
dataset_2 = subprocess.Popen(f"iai client --token {IAI_TOKEN} --session {session.id} --dataset-path {data_path}/{filename} --dataset-name {dataset_two} --remove-after-complete",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
If the AWS CLI environment is properly configured, you can provide S3 URLs as the data_path
in the SDK, or for the iai client
commands.
For example, the following code demonstrates setting the dataset path.
See AWS Batch Tutorial for an example that includes using data hosted in S3 buckets.
USING INTEGRATE.AI
integrate.ai provides a safe data collaboration platform that enables data science leaders to activate data silos by moving models, not data. It democratizes usage of the cloud by taking care of all the computational aspects of deployment — from container orchestration and deployment, to security and failovers. By harnessing the power of cloud computing and AI we enable data scientists and R&D professionals, with limited-to-no computational and machine learning training, to analyse and make sense of their data in the fastest and safest way possible.
The image below illustrates the high-level system architecture and how the managed environments interact.
The task runners use the serverless capabilities of cloud environements (such as Batch and Fargate). This greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.
AWS Environments
AWS Task Runner Configuration
Before you get started using integrate.ai in your cloud for training sessions, there are a few configuration steps that must be completed. You must grant integrate.ai permission to deploy task runner infrastructure in your cloud, by creating a limited permission Role in AWS for the provisioner and for the runtime agent. This is a one-time process - once created, you can use these roles for any task runners in your environment.
This section walks you through the required configuration.
- Create a provisioner role and policy.
- Create a runtime role and policy.
- Create a task runner.
- Use the task runner in a notebook to perform training tasks.
Create AWS IAM Provisioner Role and Policy
# Provisioner Custom trust policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::919740802015:role/iai-taskrunner-provision-<workspace-name>-prod-batch-ecs-task-role"
]
},
"Action": [
"sts:AssumeRole",
"sts:TagSession"
],
"Condition": {}
}
]
}
You must grant integrate.ai access to deploy task runner infrastructure in your cloud, by creating a limited permission Role in AWS for the provisioner.
Provisioner Role and policy
This policy lists all of the required permissions for integrate.ai to create the necessary infrastructure. Once your task runner is online, you can disable this role for extra security. However, disabling this role prevents you from creating new task runners and deleting existing ones.
The provisioner creates the following components and performs the required related tasks:
- Configures AWS Batch infrastructure, including creating roles and policies
- Configures AWS Fargate infrastructure, including creating roles and policies
- Creates a VPC and compute environment - all compute runs in the VPC created for the task runner
- Creates an S3 bucket that is encrypted with a customer key created by the provisioner
- Pulls the required client and server container images from an integrate.ai ECR (elastic container repository)
To create the provisioner role and policy:
- In the AWS Console, go to IAM, select Policies, and click Create policy.
- On the Specify permissions page, click the JSON tab.
Paste in the Runtime JSON policy provided below by integrate.ai. Do not edit this policy.
Provisioner JSON policy - do not edit. Click to expand.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "IAMPermissions", "Effect": "Allow", "Action": [ "iam:CreateInstanceProfile", "iam:GetInstanceProfile", "iam:RemoveRoleFromInstanceProfile", "iam:DeleteInstanceProfile", "iam:AddRoleToInstanceProfile", "iam:CreatePolicy", "iam:CreateRole", "iam:GetPolicy", "iam:GetRole", "iam:GetPolicyVersion", "iam:ListRolePolicies", "iam:ListAttachedRolePolicies", "iam:ListPolicyVersions", "iam:ListInstanceProfilesForRole", "iam:DeletePolicy", "iam:DeleteRole", "iam:AttachRolePolicy", "iam:PassRole", "iam:DetachRolePolicy" ], "Resource": "*" }, { "Sid": "BatchPermissions", "Effect": "Allow", "Action": [ "batch:RegisterJobDefinition", "batch:DeregisterJobDefinition", "batch:CreateComputeEnvironment", "batch:UpdateComputeEnvironment", "batch:DeleteComputeEnvironment", "batch:UpdateJobQueue", "batch:CreateJobQueue", "batch:DeleteJobQueue", "batch:DescribeComputeEnvironments", "batch:DescribeJobDefinitions", "batch:DescribeJobQueues" ], "Resource": "*" }, { "Sid": "CloudWatchPermissions", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups", "logs:ListTagsLogGroup", "logs:CreateLogGroup", "logs:DescribeLogGroups", "logs:DeleteLogGroup", "logs:TagResource" ], "Resource": "*" }, { "Sid": "ECSFargatePermissions", "Effect": "Allow", "Action": [ "ecs:CreateCluster", "ecs:DescribeClusters", "ecs:DeleteCluster", "ecs:RegisterTaskDefinition", "ecs:DescribeTaskDefinition", "ecs:DeregisterTaskDefinition" ], "Resource": "*" }, { "Sid": "KmsPermissions", "Effect": "Allow", "Action": [ "kms:ListAliases", "kms:CreateKey", "kms:CreateAlias", "kms:DescribeKey", "kms:GetKeyPolicy", "kms:GetKeyRotationStatus", "kms:ListResourceTags", "kms:ScheduleKeyDeletion", "kms:CreateGrant", "kms:ListGrants", "kms:RevokeGrant", "kms:DeleteAlias" ], "Resource": "*" }, { "Sid": "S3CreatePermissions", "Effect": "Allow", "Action": [ "s3:CreateBucket", "s3:DeleteBucket", "s3:DeleteBucketPolicy", "s3:PutBucketVersioning", "s3:PutBucketPublicAccessBlock", "s3:PutBucketVersioning", "s3:PutEncryptionConfiguration" ], "Resource": "*" }, { "Sid": "S3ReadPermissions", "Effect": "Allow", "Action": [ "s3:GetBucketCors", "s3:GetBucketPolicy", "s3:PutBucketPolicy", "s3:GetBucketWebsite", "s3:GetBucketVersioning", "s3:GetLifecycleConfiguration", "s3:GetAccelerateConfiguration", "s3:GetBucketRequestPayment", "s3:GetBucketLogging", "s3:GetBucketPublicAccessBlock", "s3:GetBucketAcl", "s3:GetBucketObjectLockConfiguration", "s3:GetReplicationConfiguration", "s3:GetBucketTagging", "s3:GetEncryptionConfiguration", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::*integrate.ai", "arn:aws:s3:::*integrate.ai/*" ] }, { "Sid": "VPCCreatePermissions", "Effect": "Allow", "Action": [ "ec2:CreateVpc", "ec2:CreateTags", "ec2:AllocateAddress", "ec2:ReleaseAddress", "ec2:CreateSubnet", "ec2:ModifySubnetAttribute", "ec2:RevokeSecurityGroupEgress", "ec2:RevokeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupIngress", "ec2:AuthorizeSecurityGroupEgress", "ec2:CreateRouteTable", "ec2:CreateRoute", "ec2:CreateInternetGateway", "ec2:AttachInternetGateway", "ec2:AssociateRouteTable", "ec2:ModifyVpcAttribute", "ec2:CreateSecurityGroup", "ec2:CreateNatGateway" ], "Resource": "*" }, { "Sid": "VPCDescribePermissions", "Effect": "Allow", "Action": [ "ec2:DescribeVpcs", "ec2:DescribeSubnets", "ec2:DescribeSubnets", "ec2:DescribeVpcAttribute", "ec2:DescribeVpcClassicLinkDnsSupport", "ec2:DescribeVpcClassicLink", "ec2:DescribeInternetGateways", "ec2:DescribeSecurityGroups", "ec2:DescribeSecurityGroupRules", "ec2:DescribeRouteTables", "ec2:DescribeNetworkAcls", "ec2:DescribeNetworkInterfaces", "ec2:DescribeNatGateways", "ec2:DescribeAddresses" ], "Resource": "*" }, { "Sid": "VPCDeletePermissions", "Effect": "Allow", "Action": [ "ec2:DeleteSubnet", "ec2:DisassociateRouteTable", "ec2:DeleteSecurityGroup", "ec2:DeleteRoute", "ec2:DeleteNatGateway", "ec2:DeleteRouteTable", "ec2:DisassociateAddress", "ec2:DetachInternetGateway", "ec2:DeleteInternetGateway", "ec2:DeleteVpc" ], "Resource": "*" } ] }Click Next.
Name the policy and click Create policy.
In the left navigation bar, select Roles, and click Create role.
Select Custom trust policy.
Paste in the custom trust relationship JSON provided by integrate.ai (shown in the code panel on the right).
Replace the
workspace-name
with the value for your workspace provided by integrate.ai.Click Next.
On the Add permissions page, search for and select the policy you just created.
Click Next.
Provide the following Role name:
iai-taskrunner-provisioner
. Important: Do not edit or change this name.Click Create role.
Copy and save the ARN for the provisioner role. It is required for creating a task runner.
Create AWS IAM Runtime Role and Policy
You must grant integrate.ai access to run the task runner in your cloud environment by creating a limited permission Role in AWS for the runtime agent.
Runtime Role and policy:
# Runtime Custom trust policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::919740802015:role/IAI-API-Instance-<workspace-name>-prod"
]
},
"Action": [
"sts:AssumeRole",
"sts:TagSession"
],
"Condition": {}
}
]
}
This policy requires fewer permissions, compared to the Provisioner role. For increased security, the runtime role has much less access. It is used to run training tasks.
To create the runtime policy and role:
- In the AWS Console, go to IAM, select Policies, and click Create policy.
- On the Specify permissions page, click the JSON tab.
Paste in the Runtime JSON policy provided below by integrate.ai. Do not edit this policy.
Runtime JSON policy - do not edit. Click to expand.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowBatchDescribeJobs", "Effect": "Allow", "Action": [ "batch:DescribeJobs", "batch:TagResource" ], "Resource": "*" }, { "Sid": "AllowBatchAccess", "Effect": "Allow", "Action": [ "batch:TerminateJob", "batch:SubmitJob", "batch:TagResource" ], "Resource": [ "arn:aws:batch:*:*:job-definition/iai-fl-client-batch-job-*", "arn:aws:batch:*:*:job-queue/iai-fl-client-batch-job-queue-*", "arn:aws:batch:*:*:job/*" ] }, { "Sid": "AllowECSUpdateAccess", "Effect": "Allow", "Action": [ "ecs:DescribeContainerInstances", "ecs:DescribeTasks", "ecs:ListTasks", "ecs:UpdateContainerAgent", "ecs:StartTask", "ecs:StopTask", "ecs:RunTask" ], "Resource": [ "arn:aws:ecs:*:*:cluster/iai-fl-server-ecs-cluster-*", "arn:aws:ecs:*:*:task/iai-fl-server-ecs-cluster-*", "arn:aws:ecs:*:*:task-definition/iai-fl-server-fargate-job-*" ] }, { "Sid": "AllowECSReadAccess", "Effect": "Allow", "Action": [ "ecs:DescribeTaskDefinition" ], "Resource" : ["*"] }, { "Sid": "AllowPassRole", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam::*:role/iai-fl-server-fargate-task-role-*-*", "arn:aws:iam::*:role/iai-fl-server-fargate-execution-role-*-*" ] }, { "Sid": "AllowSaveTokens", "Effect": "Allow", "Action": [ "ssm:PutParameter", "ssm:DescribeParameters", "ssm:GetParameters", "ssm:GetParameter" ], "Resource": [ "arn:aws:ssm:*:*:parameter/fl-server-*-token", "arn:aws:ssm:*:*:parameter/fl-client-*-token" ] }, { "Sid": "AllowS3Access", "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetObject", "s3:GetObjectAcl", "s3:GetObjectVersion", "s3:ListBucketVersions" ], "Resource": [ "arn:aws:s3:::*.integrate.ai", "arn:aws:s3:::*.integrate.ai/*" ] }, { "Sid": "AllowKMSUsage", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:Encrypt", "kms:GenerateDataKey" ], "Resource": "*", "Condition": { "ForAnyValue:StringLike": { "kms:ResourceAliases": "alias/iai/*" } } }, { "Sid": "AllowLogReading", "Effect": "Allow", "Action": [ "logs:Describe*", "logs:List*", "logs:StartQuery", "logs:StopQuery", "logs:TestMetricFilter", "logs:FilterLogEvents", "logs:Get*" ], "Resource": [ "arn:aws:logs:*:*:log-group:iai-fl-server-fargate-log-group-*:log-stream:ecs/fl-server-fargate/*", "arn:aws:logs:*:*:log-group:/aws/batch/job:log-stream:iai-fl-client-batch-job-*" ] } ] }Click Next.
Name the policy and click Create policy.
In the left navigation bar, select Roles, and click Create role.
Select Custom trust policy.
Paste in the custom trust relationship JSON provided by integrate.ai (shown in the code panel on the right).
Replace the
workspace-name
with the value for your workspace provided by integrate.ai.Click Next.
On the Add permissions page, search for and select the policy you just created.
Click Next.
Provide the following Role name:
iai-taskrunner-runtime
Important: Do not edit or change this name.Click Create role.
Copy and save the ARN for the runtime role. It is required for creating a task runner.
Role and policy configuration is now complete.
Create an AWS Task Runner
Task runners simplify the process of running training sessions on your data.
Note: before attempting to create a task runner, ensure you have completed the AWS Task Runner Configuration.
To create an AWS task runner:
- Log in to your integrate.ai workspace.
- Click your name in the upper right corner to access the menu.
- Click Settings.
- In the left-hand navigation, under Workspace, click Task Runners.
- Click Connect new to start creating a task runner.
- Select the service provider - Amazon Web Services.
- Provide the following information:
Task runner name
- must be unique and must be no more than 16 characters. No capital letters or special characters are permitted.Provisioner Role ARN
- must be a valid ARNRuntime Role ARN
- must be a valid ARNRegion
- select from the listvCPU size
- specify the number of virtual CPUsMemory size (MB)
- specify the memory size in megabytes (MB). At least 1024 is recommended.
- Click Save.
- Wait for the status to change to Online. Note that you may have to refresh the page to see the updated status.
Q: What if task runner creation fails?
A: Delete the task runner and try again. Make sure that you have specified the correct ARN for both the provisioner and runtime roles with the integrate.ai policy applied.
After successfully creating a task runner, you can use it to perform training tasks. You can reuse the task runner; there's no need to create a new one for each task.
Running a training task with an AWS Task Runner
The integrateai_taskrunner.ipynb
notebook provides examples of how to use an AWS task runner for different federated learning tasks.
You must install the integrate.ai SDK locally in order to run the notebook examples. For instructions, see Installing the SDK. Note that you do not need to install the integrate.ai Client, as the Task Runner performs the Client operations for you.
Download the notebook here, and the sample data here.
Microsoft Azure Environments
Azure Task Runner Configuration
Before you get started using integrate.ai in your Microsoft Azure cloud for training sessions, there are a few configuration steps that must be completed. You must grant integrate.ai permission to deploy task runner infrastructure in your cloud, by creating a dedicated resource group and a limited permission service principal for provisioning a task runner. The provisioner automatically creates a second service principal to execute Azure tasks using the task runner. This is a one-time process - once created, you can use this infrastructure for any task runners in your environment.
This section walks you through the required configuration.
- Create a Resource Group and provisioner service principal.
- Create a task runner.
- Use the task runner in a notebook to perform training tasks.
Create an Azure Resource Group & Service Principal
# Example Minimum Permission Policy Requirements for the Provisioner Service Principal
{
"properties": {
"roleName": "iai-provisioner",
"description": "Provision new task runner",
"assignableScopes": ["/"],
"permissions": [
{
"actions": [
"*/read",
"Microsoft.Authorization/roleAssignments/write",
"Microsoft.Storage/storageAccounts/write",
"Microsoft.Storage/storageAccounts/listKeys/action",
"Microsoft.Storage/storageAccounts/delete"
],
"notActions": [],
"dataActions": [],
"notDataActions": []
}
]
}
}
You must grant integrate.ai access to deploy task runner infrastructure in your cloud, by creating a dedicated resource group and service principal for the provisioner agent. A dedicated resource group is required for integrate.ai to provision resources into. You must provide the credentials for the service principal as part of the task runner creation process.
In order to provide all the necessary permissions, the user who creates the resource group and provisioner service principal must be an Azure AD Administrator.
Internally, integrate.ai will create a second service principal with reduced permissions to run tasks within the previously created resource group.
Permission Requirements for Task Runner Provisioner
- The service principal must be granted
Application Admin
access in the Azure Active Directory. This operation can only be performed by an Azure Admin. This privilege is required by the provisioner for the app registration, which issues a new service principal for running the task. - The service principal must also be granted access in the resource group. The role can be the resource group owner however this is not recommended. Instead, a restricted role can be used according to the minimum policy requirements.
- If not already available, install the
Azure CLI
. For more information, see the Azure CLI documentation. - At the command prompt, type:
az ad sp create-for-rbac --role="Owner" --scopes="/subscriptions/<your subscription ID>/resourcegroups/<your-resource-group>"
Make sure that you specify the correct resource group name. - Copy the output and save it. This information is required to connect a new task runner. Note: The output includes credentials that you must protect.
- From the Azure AD dashboard, add the Application Administrator permission to the service principal.
# Example CLI output:
Creating 'Owner' role assignment under scope '/subscriptions/<subscription ID>/resourcegroups/test-resource-group'
The output includes credentials that you must protect. Be sure that you do not include these credentials in your code or check the credentials into your source control. For more information, see https://aka.ms/azadsp-cli
{
"appId": "<Client ID>",
"displayName": "azure-cli-2023-04-13-14-57-09",
"password": "<secret>",
"tenant": "<tenant ID>"
}
The provisioner creates the following components and performs the required related tasks:
- Configures infrastructure
- Creates a task runner service principal for executing tasks
- Pulls the required client and server container images from an integrate.ai ECR (elastic container repository)
Create an Azure Task Runner
Task runners simplify the process of running training sessions on your data.
Note: before attempting to create a task runner, ensure you have completed the Azure Task Runner Configuration.
To create an Azure task runner:
- Log in to your integrate.ai workspace.
- Click your name in the upper right corner to access the menu.
- Click Settings.
- In the left-hand navigation, under Workspace, click Task Runners.
- Click Connect new to start creating a task runner.
- Select the service provider - Microsoft Azure.
- Provide the following information:
Task runner name
- must be uniqueRegion
- select from the listResource group
- must be an existing dedicated resource groupService principal ID
- this is theappId
from the Azure CLI output of creating a service principal.Service principal secret
- this is thepassword
from the Azure CLI outputSubscription ID
- the ID of your Microsoft Azure subscription. Can be found on the Azure dashboard.Tenant ID
- this is thetenantId
from the Azure CLI output of creating a service principal.vCPU
- the number of virtual CPUs the task runner is allowed to useMemory size (MB)
- the amount of memory to allocate to the task runners
Click Save. Wait for the status to change to Online. Note that you may have to refresh the page to see the updated status.
After successfully creating a task runner, you can use it to perform training tasks. You can reuse the task runner; there's no need to create a new one for each task.
Running a training task with an Azure Task Runner
The integrateai_taskrunner_azure.ipynb
notebook provides examples of how to use an Azure task runner for different federated learning tasks.
You must install the integrate.ai SDK locally in order to run the notebook examples. For instructions, see Installing the SDK. Note that you do not need to install the integrate.ai Client, as the Task Runner performs the Client operations for you.
Download the notebook here, and the sample data here.
Developers - Installing the SDK
To run the integrate.ai SDK samples and build models, ensure that your environment is configured properly.
Required software:
- Python 3.8 (or later)
- Pip 22.2 (or later)
- Docker
To run the sample notebook, you may also need:
- Jupyter notebook (or other code tool such as VS Code)
Install integrate.ai SDK and Client
Generate an access token
To install the client and SDK, you must generate an access token through the web portal.
- Log in to your workspace through the portal.
- On the Dashboard, click Generate Access Token.
- Copy the acccess token and save it to a secure location.
Treat your API tokens like passwords and keep them secret. When working with the API, use the token as an environment variable instead of hardcoding it into your programs. In this documentation, the token is referenced as <IAI_TOKEN>
.
Download the sample notebooks and dataset
You can download the sample Jupyter notebooks from integrate.ai's sample repository.
Download the sample dataset to use with the end user tutorials.
The sample includes four files:
test.parquet
- test data, to be used when joining a sessiontrain_silo0.parquet
- training data for dataset 1train_silo1.parquet
- training data for dataset 2train_centralized.parquet
- all training data in one file
Install integrate.ai packages
- At a command prompt on your machine, run the following command to install the management tool for the SDK and client:
pip install integrate-ai
- Install the SDK package using the access token you generated.
iai sdk install --token <IAI_TOKEN>
Note: If you are using a Task Runner, you do not need to install the client or server.
- Install the integrate.ai client using the same access token. The client is a Docker image that runs in a container.
iai client pull --token <IAI_TOKEN>
- For local development, install the integrate.ai server using the same access token. The server is a Docker image that runs in a container.
iai server pull --token <IAI_TOKEN>
Optional: If you are building a model for data that includes images or video, follow the steps for Setting up a Docker GPU Environment. Otherwise, you can skip the following section.
Updating your environment
When a new version of an integrate.ai package is released, follow these steps to update your development environment.
Check version numbers
From a command prompt or terminal, use the pip show
command to see the version of the integrate-ai command line interface (CLI).
To check the client version, run iai client version
.
To check the SDK version, run iai sdk version
.
You can compare these versions against the feature release versions in the Release Notes to ensure that your environment is up to date.
Get the latest packages
Run the following command to update the CLI and all dependent packages.
pip install -U integate-ai
Use the following command to update the client:
iai client pull
END USER TUTORIALS
Tutorial - FFNet Model Training with a Sample Local Dataset (iai_ffnet)
To help you get started with federated learning in the integrate.ai system, we've provided a tutorial based on synthetic data with pre-built configuration files that you can run on your local machine, or using a task runner. In this tutorial, you will train a federated feedforward neural network (iai_ffn) using data from two datasets. The datasets, model, and data configuration are provided for you.
The sample notebook (integrateai_api.ipynb) contains runnable code snippets for exploring the SDK and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.
Tip: if you are running the example locally, make sure you have completed the Environment Setup.
Open the integrateai_api.ipynb notebook to test the code as you walk through this exercise.
Understanding Models
model_config = {
"experiment_name": "test_synthetic_tabular",
"experiment_description": "test_synthetic_tabular",
"strategy": {
"name": "FedAvg", // Name of the federated learning strategy
"params": {}
},
"model": { // Parameters specific to the model type
"params": {
"input_size": 15,
"hidden_layer_sizes": [6, 6, 6],
"output_size": 2
}
},
"balance_train_datasets": False, // Performs undersampling on the dataset
"ml_task": { // Specifies the federated learning strategy
"type": "classification",
"params": {
"loss_weights": None,
},
},
"optimizer": {
"name": "SGD", // Name of the PyTorch optimizer used
"params": {
"learning_rate": 0.2,
"momentum": 0.0}
},
"differential_privacy_params": { // Defines the differential privacy parameters
"epsilon": 4,
"max_grad_norm": 7
},
"save_best_model": {
"metric": "loss", // To disable this and save the model from the last round, set to None
"mode": "min",
},
}
integrate.ai has several standard model classes available, including:
- Feedforward Neural Nets (iai_ffn) - uses the same activation for each hidden layer.
- Generalized Linear Models (iai_glm) - uses a linear feedforward layer.
- Gradient Boosted Models (iai_gbm) - uses the sklearn implementation of HistGradientBoostingModels.
- Linear Inference Models (iai_linear_inference) - performs statistical inference on model coefficients for linear and logistic regression.
Model configuration
These standard models are defined using JSON configuration files during session creation. The model configuration (model_config
) is a JSON object that contains the model parameters for the session.
There are five main properties with specific key-value pairs used to configure the model:
strategy
- Select one of the available federated learning strategies from the strategy library.model
- Defines the specific parameters required for the model type.ml-task
- Defines the federated learning strategy and associated parameters.optimizer
- Defines the parameters for the PyTorch optimizer.differential_privacy_params
- Defines the differential privacy parameters. See Differential Privacy for more information.
The example in the notebook is a model provided by integrate.ai. For this tutorial, you do not need to change any of the values.
Data configuration
data_config = {
"predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
"target": "y",
}
The data configuration is a JSON object where the user specifies predictor and target columns that are used to describe input data. This is the same structure for both GLM and FNN.
Once you have created or updated the model and data configurations, the next step is to create a training session to begin working with the model and datasets.
Authenticate with your Access Token
import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")
from integrate_ai_sdk.api import connect
client = connect(token=IAI_TOKEN)
Use your access token to authenticate to the API client. The SDK simplifies the authentication process by providing a connect helper module. You import the connect helper from the SDK, provide your token, and authenticate to the client.
Set an environment variable for your token (as described in Environment Setup) or replace it inline in the sample code.
Reminder: You can generate and manage tokens through the integrate.ai web portal.
Create and start the training session
session = client.create_fl_session(
name="Testing notebook",
description="I am testing session creation through a notebook",
min_num_clients=2,
num_rounds=2,
package_name="iai_ffnet",
model_config=model_config,
data_config=data_config,
).start()
session.id
Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.
Create a new session each time you want to train a new model.
The code sample demonstrates creating and starting a session with two training datasets (specified as min_num_clients
) and two rounds (num_rounds
). It returns a session ID that you can use to track and reference your session.
The package_name
specifies the federated learning model package - in the example, it is iai_ffnet
however, other packages are supported. See Model packages for more information.
Join clients to the session
import subprocess
data_path = "~/Downloads/synthetic"
#Join dataset one (silo0)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1 --remove-after-complete",
shell=True
)
#Join dataset two (silo1)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2 --remove-after-complete",
shell=True
)
The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients
argument. Therefore, to run this example, you will call subprocess.Popen
twice to connect each dataset to the session as a separate client.
The session begins training after the minimum number of clients have joined the session.
Each client runs as a separate Docker container to simulate distributed data silos.
data_path
is the path to the sample data on your local machineIAI_TOKEN
is your access tokensession.id
is the ID returned by the previous step ()train-path
is the path to and name of the sample dataset file
Poll for session results
import time
current_round = None
current_status = None
while client_1.poll() is None or client_2.poll() is None:
output1 = client_1.stdout.readline().decode("utf-8").strip()
output2 = client_2.stdout.readline().decode("utf-8").strip()
if output1:
print("silo1: ", output1)
if output2:
print("silo2: ", output2)
# poll for status and round
if current_status != training_session.status:
print("Session status: ", training_session.status)
current_status = training_session.status
if current_round != training_session.round and training_session.round > 0:
print("Session round: ", training_session.round)
current_round = training_session.round
time.sleep(1)
output1, error1 = client_1.communicate()
output2, error2 = client_2.communicate()
print(
"client_1 finished with return code: %d\noutput: %s\n %s"
% (client_1.returncode, output1.decode("utf-8"), error1.decode("utf-8"))
)
print(
"client_2 finished with return code: %d\noutput: %s\n %s"
% (client_2.returncode, output2.decode("utf-8"), error2.decode("utf-8"))
)
Depending on the type of session and the size of the datasets, sessions may take some time to run. In the sample notebook and this tutorial, we poll the server to determine the session status.
You can log information about the session during this time. In this example, we are logging the current round and the clients that have joined the session.
Another popular option is to log the session.metrics().as_dict()
to view the in-progress training metrics.
Session Complete
Congratulations, you have your first federated model! You can test it by making predictions. For more information, see (Making Predictions).
Tutorial - GLM Model Training with a Sample Local Dataset (iai_glm)
An end-to-end tutorial for how to train an existing model with a synthetic dataset on a local machine.
To help you get started, we provide a tutorial based on synthetic data with pre-built configuration files. In this tutorial, you will train a federated feedforward neural network (iai_ffn) using data from two datasets. The datasets, model, and data configuration are provided for you.
The sample notebook (integrateai_api.ipynb) contains runnable code snippets for exploring the SDK and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.
Before you get started, complete the Environment Setup for your local machine.
Open the integrateai_api.ipynb notebook to test the code as you walk through this exercise.
Understanding Models
model_config = {
"experiment_name": "test_synthetic_tabular",
"experiment_description": "test_synthetic_tabular",
"strategy": {
"name": "FedAvg", // Name of the federated learning strategy
"params": {}
},
"model": { // Parameters specific to the model type
"params": {
"input_size": 15,
"output_activation": "sigmoid"
}
},
"balance_train_datasets": False, // Performs undersampling on the dataset
"ml_task": {
"type": "logistic",
"params": {
"alpha": 0,
"l1_ratio": 0
}
},
"optimizer": {
"name": "SGD",
"params": {
"learning_rate": 0.05,
"momentum": 0.9
}
},
"differential_privacy_params": {
"epsilon": 4,
"max_grad_norm": 7
},
}
integrate.ai has a standard model class available for Feedforward Neural Nets (iai_ffn) and Generalized Linear Models (iai_glm). These standard models are defined using JSON configuration files during session creation.
The model configuration (model_config
) is a JSON object that contains the model parameters for the session. There are five main properties with specific key-value pairs used to configure the model:
strategy
- Select one of the available federated learning strategies from the (strategy library).model
- Defines the specific parameters required for the model type. **output_activation
should be set as the “inverse link function“ for GLM. For example,sigmoid
for thelogit
link, andexp
for thelog
link. Currently supported values includesigmoid, exp, tanh
.ml-task
- Defines the federated learning task and associated parameters. The following values are supported:logistic
,normal
,poisson
,gamma
, andinverseGaussian
. Choose the strategy based on the type of the target variable. For example,logistic
for binary target andpoisson
for counts.optimizer
- Defines the parameters for the PyTorch optimizer.differential_privacy_params
- Defines the differential privacy parameters. See (Differential Privacy) for more information.
The example in the notebook is a model provided by integrate.ai. For this tutorial, you do not need to change any of the values.
data_config = {
"predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
"target": "y",
}
The data configuration is a JSON object where the user specifies predictor and target columns that are used to describe input data. This is the same structure for both GLM and FNN.
Once you have created or updated the model and data, the next step is to create a training session to begin working with the model and datasets.
Authenticate with your Access Token
import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")
from integrate_ai_sdk.api import connect
client = connect(token=IAI_TOKEN)
Use your access token to authenticate to the API client. The SDK simplifies the authentication process by providing a connect helper module. You import the connect helper from the SDK, provide your token, and authenticate to the client.
Set an environment variable for your token (as described in Environment Setup) or replace it inline in the sample code.
Reminder: You can generate and manage tokens through the integrate.ai web portal.
Create and start the training session
session = client.create_fl_session(
name="Testing notebook",
description="I am testing session creation through a notebook",
min_num_clients=2,
num_rounds=2,
package_name="iai_ffnet",
model_config=model_config,
data_config=data_config,
).start()
session.id
Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.
Create a session each time you want to train a new model.
The code sample demonstrates creating and starting a session with two training datasets (specified as min_num_clients
) and two rounds (num_rounds
). It returns a session ID that you can use to track and reference your session.
The package_name
specifies the federated learning model package - in the example, it is iai_ffnet
however, other packages are supported. See Model packages for more information.
Join clients to the session
import subprocess
data_path = "~/Downloads/synthetic"
#Join dataset one (silo0)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1 --remove-after-complete",
shell=True
)
#Join dataset two (silo1)
subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {session.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2 --remove-after-complete",
shell=True
)
The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients argument. Therefore, to run this example, you will call subprocess.Popen
twice to connect each dataset to the session as a separate client.
The session begins training once the minimum number of clients have joined the session.
Each client runs as a separate Docker container to simulate distributed data silos.
data_path
is the path to the sample data on your local machineIAI_TOKEN
is your access tokensession.id
is the ID returned by the previous step ()train-path
is the path to and name of the sample dataset file
Poll for session results
import time
current_round = None
current_status = None
while client_1.poll() is None or client_2.poll() is None:
output1 = client_1.stdout.readline().decode("utf-8").strip()
output2 = client_2.stdout.readline().decode("utf-8").strip()
if output1:
print("silo1: ", output1)
if output2:
print("silo2: ", output2)
# poll for status and round
if current_status != training_session.status:
print("Session status: ", training_session.status)
current_status = training_session.status
if current_round != training_session.round and training_session.round > 0:
print("Session round: ", training_session.round)
current_round = training_session.round
time.sleep(1)
output1, error1 = client_1.communicate()
output2, error2 = client_2.communicate()
print(
"client_1 finished with return code: %d\noutput: %s\n %s"
% (client_1.returncode, output1.decode("utf-8"), error1.decode("utf-8"))
)
print(
"client_2 finished with return code: %d\noutput: %s\n %s"
% (client_2.returncode, output2.decode("utf-8"), error2.decode("utf-8"))
)
Depending on the type of session and the size of the datasets, sessions may take some time to run. In the sample notebook and this tutorial, we poll the server to determine the session status.
You can log information about the session during this time. In this example, we are logging the current round and the clients that have joined the session.
Another popular option is to log the session.metrics().as_dict()
to view the in-progress training metrics.
Session Complete
Congratulations, you have your first federated model! You can test it by making predictions. For more information, see (Making Predictions).
Linear Inference Sessions
An overview and example of a linear inference model for performing tasks such as GWAS in HFL.
The built-in model package iai_linear_inference
trains a bundle of linear models for the target of interest against a specified list of predictors. It obtains the coefficients and variance estimates, and also calculates the p-values from the corresponding hypothesis tests. Linear inference is particularly useful for genome-wide association studies (GWAS), to identify genomic variants that are statistically associated with a risk for a disease or a particular trait.
This is a horizontal federated learning (HFL) model package.
The integrateai_linear_inference.ipynb
, available in the integrate.ai sample repository, demonstrates this model package using the sample data that is available for download here.
Follow the instructions in the Installing the integrate.ai SDK section to prepare a local test environment for this tutorial.
Overview of the iai_linear_inference package
# Example model_config for a binary target
model_config_logit = {
"strategy": {"name": "LogitRegInference", "params": {}},
"seed": 23, # for reproducibility
}
There are two strategies available in the package:
- LogitRegInference - for use when the target of interest is binary
- LinearRegInference - for use when the target is continuous
# Example data_config
data_config_logit = {
"target": "y",
"shared_predictors": ["x1", "x2"],
"chunked_predictors": ["x0", "x3", "x10", "x11"]
}
The data_config dictionary should include the following three fields.
target
: the target column of interestshared_predictors
: predictor columns that should be included in all linear models. For example, the confounding factors like age, gender in GWAS.chunked_predictors
: predictor columns that should be included in the linear model one at a time. For example, the gene expressions in GWAS.
Note: The columns in all the fields can be specified as either names/strings or indices/integers.
With this example data configuration, the session trains four logistic regression models with y
as the target, and x1
, x2
plus any one of x0
, x3
, x10
, x11
as predictors.
Create a linear inference training session
# Example training session
training_session_logit = client.create_fl_session(
name="Testing linear inference session",
description="I am testing linear inference session creation through a notebook",
min_num_clients=2,
num_rounds=5,
package_name="iai_linear_inference",
model_config=model_config_logit,
data_config=data_config_logit,
).start()
training_session_logit.id
For this example, there are two (2) training clients and the model is trained over five (5) rounds.
Argument (type) | Description |
---|---|
name (str) | Name to set for the session |
description (str) | Description to set for the session |
min_num_clients (int) | Number of clients required to connect before the session can begin |
num_rounds (str) | Number of rounds of federated model training to perform |
package_name (str) | Name of the model package to be used in the session |
model_config (dict) | Contains the model configuration to be used for the session |
data_config (dict) | Contains the data configuration to be used for the session |
Ensure that you have downloaded the sample data. If you saved it to a location other than your Downloads folder, specify the data_path
to the correct location.
Expected path:
data_path = "~/Downloads/synthetic"
Start the linear inference training session
import subprocess
client_1 = subprocess.Popen(
f"iai client train --token {IAI_TOKEN} --session {training_session_logit.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1-inference --remove-after-complete",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
client_2 = subprocess.Popen(
f"iai client train --token {IAI_TOKEN} --session {training_session_logit.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2-inference --remove-after-complete",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
This example demonstrates starting a training session locally. The clients joining the session are named client-1-inference
and client-2-inference
, respectively.
Poll for linear inference session status
import time
current_round = None
current_status = None
while client_1.poll() is None or client_2.poll() is None:
output1 = client_1.stdout.readline().decode("utf-8").strip()
output2 = client_2.stdout.readline().decode("utf-8").strip()
if output1:
print("silo1: ", output1)
if output2:
print("silo2: ", output2)
# poll for status and round
if current_status != training_session_logit.status:
print("Session status: ", training_session_logit.status)
current_status = training_session_logit.status
if current_round != training_session_logit.round and training_session_logit.round > 0:
print("Session round: ", training_session_logit.round)
current_round = training_session_logit.round
time.sleep(1)
output1, error1 = client_1.communicate()
output2, error2 = client_2.communicate()
print(
"client_1 finished with return code: %d\noutput: %s\n %s"
% (client_1.returncode, output1.decode("utf-8"), error1.decode("utf-8"))
)
print(
"client_2 finished with return code: %d\noutput: %s\n %s"
% (client_2.returncode, output2.decode("utf-8"), error2.decode("utf-8"))
)
Use a polling function, such as the example provided, to wait for the session to complete.
View training metrics and model details
# Example output
{'session_id': '3cdf4be992',
'federated_metrics': [{'loss': 0.6931747794151306},
{'loss': 0.6766608953475952},
{'loss': 0.6766080856323242},
{'loss': 0.6766077876091003},
{'loss': 0.6766077876091003}],
'client_metrics': [{'user@integrate.ai:dedbb7e9be2046e3a49b28b0131c4b97': {'test_loss': 0.6931748060977674,
'test_accuracy': 0.4995,
'test_roc_auc': 0.5,
'test_num_examples': 4000},
'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_loss': 0.6931748060977674,
'test_accuracy': 0.4995,
'test_roc_auc': 0.5,
'test_num_examples': 4000}},
{'user@integrate.ai:dedbb7e9be2046e3a49b28b0131c4b97': {'test_num_examples': 4000,
'test_loss': 0.6766608866775886,
'test_roc_auc': 0.5996664746664747,
'test_accuracy': 0.57625},
'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_num_examples': 4000,
'test_loss': 0.6766608866775886,
'test_accuracy': 0.57625,
'test_roc_auc': 0.5996664746664747}},
{'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_loss': 0.6766080602706078,
'test_accuracy': 0.5761875,
'test_num_examples': 4000,
...
'user@integrate.ai:339d50e453f244ed9cb2662ab2d3bb66': {'test_accuracy': 0.5761875,
'test_roc_auc': 0.5996632246632246,
'test_num_examples': 4000,
'test_loss': 0.6766078165060236}}],
'latest_global_model_federated_loss': 0.6766077876091003}
Once the session is complete, you can view the training metrics and model details such as the model coefficients and p-values. In this example, since there are a bundle of models being trained, the metrics are the average values of all the models.
training_session_logit.metrics().as_dict()
Plot the metrics
training_session_logit.metrics().plot()
Example output:
Retrieve the trained model
# Example of retrieving p-values
pv = model_logit.p_values()
pv
#Example p-value output:
x0 112.350396
x3 82.436540
x10 0.999893
x11 27.525280
dtype: float64
The LinearInferenceModel
object can be retrieved using the model's as_pytorch
method. The relevant information, such as p-values
, can be accessed directly from the model object.
model_logit = training_session_logit.model().as_pytorch()
Retrieve the p-values
The .p_values()
function returns the p-values of the chunked predictors.
Summary information
The .summary
method fetches the coefficient, standard error, and p-value of the model corresponding to the specified predictor.
#Example of fetching summary
summary_x0 = model_logit.summary("x0")
summary_x0`
Example summary output:
Making predictions from a linear inference session
from torch.utils.data import DataLoader
from integrate_ai_sdk.packages.LinearInference.dataset import ChunkedTabularDataset
ds = ChunkedTabularDataset(path=f"{data_path}/test.parquet", **data_config_logit)
dl = DataLoader(ds, batch_size=len(ds), shuffle=False)
x, _ = next(iter(dl))
y_pred = model_logit(x)
y_pred
You can also make predictions with the resulting bundle of models when the data is loaded by the ChunkedTabularDataset
from the iai_linear_inference
package. The predictions will be of shape (n_samples, n_chunked_predictors)
where each column corresponds to one model from the bundle.
#Example prediction output:
tensor([[0.3801, 0.3548, 0.4598, 0.4809],
[0.4787, 0.3761, 0.4392, 0.3579],
[0.5151, 0.3497, 0.4837, 0.5054],
...,
[0.7062, 0.6533, 0.6516, 0.6717],
[0.3114, 0.3322, 0.4257, 0.4461],
[0.4358, 0.4912, 0.4897, 0.5110]], dtype=torch.float64)
`</span><span class="err">
MODEL TRAINING
Gradient Boosted Models (HFL-GBM)
Gradient boosting is a machine learning algorithm for building predictive models that helps minimize the bias error of the model. The gradient boosting model provided by integrate.ai is an HFL model that uses the sklearn implementation of HistGradientBoostingClassifier for classifier tasks and HistGradientBoostingRegresssor for regression tasks.
The GBM sample notebook (integrateai_api_gbm.ipynb) provides sample code for running the SDK, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.
Prerequisites
- Open the integrateai_api_gbm.ipynb notebook to test the code as you walk through this tutorial.
- Download the sample dataset to use with this tutorial. The sample files are: train_silo0.parquet - training data for dataset 1 train_silo1.parquet - training data for dataset 2 test.parquet - test data, to be used when joining a session
Review the sample Model Configuration
data_schema = {
"predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
"target": "y",
}
model_config = {
"strategy": {
"name": "HistGradientBoosting",
"params": {}
},
"model": {
"params": {
"max_depth": 4,
"learning_rate": 0.05,
"random_state": 23,
"max_bins": 128,
"sketch_relative_accuracy": 0.001,
}
},
"ml_task": {"type": "classification", "params": {}},
"save_best_model": {
"metric": None,
"mode": "min"
},
}
integrate.ai has a model class available for Gradient Boosted Models, called iai_gbm. This model is defined using a JSON configuration file during session creation.
The strategy for GBM is HistGradientBoosting
.
You can adjust the following parameters as needed:
- max_depth - Used to control the size of the trees.
- learning_rate - (shrinkage) Used as a multiplicative factor for the leaves values. Set this to one (1) for no shrinkage.
- max_bins - The number of bins used to bin the data. Using less bins acts as a form of regularization. It is generally recommended to use as many bins as possible.
- sketch_relative_accuracy - Determines the precision of the sketch technique used to approximate global feature distributions, which are used to find the best split for tree nodes.
Set the machine learning task type to either classification
or regression
.
Specify any parameters associated with the task type in the params
section.
The save_best_model
option allows you to set the metric and mode for model saving. By default this is set to None
, which saves the model from the previous round, and min
.
The notebook also provides a sample data schema. For the purposes of testing GBM, use the sample schema as shown.
For more about data schemas, see ....
Create a GBM training session
training_session = client.create_fl_session(
name="HFL session testing GBM",
description="I am testing GBM session creation through a notebook",
min_num_clients=2,
num_rounds=10,
package_name="iai_gbm",
model_config=model_config,
data_config=data_schema,
).start()
training_session.id
Federated learning models created in integrate.ai are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session.
Create a session each time you want to train a new model.
The code sample demonstrates creating and starting a session with two training clients (two datasets) and ten rounds (or trees). It returns a session ID that you can use to track and reference your session.
Join the training session
import subprocess
data_path = "~/Downloads/synthetic"
client_1 = subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {training_session.id} --train-path {data_path}/train_silo0.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-1 --remove-after-complete",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
client_2 = subprocess.Popen(f"iai client train --token {IAI_TOKEN} --session {training_session.id} --train-path {data_path}/train_silo1.parquet --test-path {data_path}/test.parquet --batch-size 1024 --client-name client-2 --remove-after-complete",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
The next step is to join the session with the sample data. This example has data for two datasets simulating two clients, as specified with the min_num_clients
argument. The session begins training once the minimum number of clients have joined the session.
- data_path is the path to the sample data on your local machine
- IAI_TOKEN is your access token
- training_session.id is the ID returned by the previous step
- train-path is the path to and name of the sample dataset file
- test-path is the path to and name of the sample test file
- batch-size is the size of the batch of data
- client-name is the unique name for each client
Poll for Session Results
Sessions take some time to run. In the sample notebook and this tutorial, we poll the server to determine the session status.
View the training metrics
// Retrieve the model metrics
training_session.metrics().as_dict()
After the session completes successfully, you can view the training metrics and start making predictions. 1. Retrieve the model metrics as_dict. 2. Plot the metrics.
Example output:
{'session_id': '9fb054bc24',
'federated_metrics': [{'loss': 0.6876291940808297},
{'loss': 0.6825978879332543},
{'loss': 0.6780059585869312},
{'loss': 0.6737175708711147},
{'loss': 0.6697578155398369},
{'loss': 0.6658972384035587},
{'loss': 0.6623568458259106},
{'loss': 0.6589517279565335},
{'loss': 0.6556690569519996},
{'loss': 0.6526266353726387},
{'loss': 0.6526266353726387}],
'rounds': [{'user info': {'test_loss': 0.6817054933875072,
'test_roc_auc': 0.6868823702288674,
'test_accuracy': 0.7061688311688312,
'test_num_examples': 8008},
'user info': {'test_accuracy': 0.5720720720720721,
'test_num_examples': 7992,
'test_roc_auc': 0.6637941733389123,
'test_loss': 0.6935647668830733}},
{'user info': {'test_accuracy': 0.5754504504504504,
'test_roc_auc': 0.6740578481919338,
'test_num_examples': 7992,
'test_loss': 0.6884922753070576},
'user info': {'test_loss': 0.6767152831608197,
...
'user info': {'test_loss': 0.6578156923815811,
'test_num_examples': 7992,
'test_roc_auc': 0.7210704078520924,
'test_accuracy': 0.6552802802802803}}],
'latest_global_model_federated_loss': 0.6526266353726387}
Plot the metrics
// Plot the metrics
fig = training_session.metrics().plot()
Example (image)
model = training_session.model().as_sklearn()
model
Example (image)
Load the test data
import pandas as pd
test_data = pd.read_parquet(f"{data_path}/test.parquet")
test_data.head()
Example (image)
Convert test data to tensors
Y = test_data["y"]
X = test_data[["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"]]
Run model predictions
model.predict(X)
Result: array([0, 1, 0, ..., 0, 0, 1])
from sklearn.metrics import roc_auc_score
y_hat = model.predict_proba(X)
roc_auc_score(Y, y_hat[:, 1])
Result: 0.7082738332738332
Note: When the training sample sizes are small, this model is more likely to overfit the training data.
Private Record Linkage (PRL) sessions
Private record linkage sessions create intersection and alignment among datasets to prepare them for vertical federated learning (VFL).
In a vertical federated learning process, two or more parties collaboratively train a model using datasets that share a set of overlapping features. These datasets generally each contain distinct data with some overlap. This overlap is used to define the intersection of the sets. Private record linkage (PRL) uses the intersection to create alignment between the sets so that a shared model can be trained.
Overlapping records are determined privately through a PRL session, which combines Private Set Intersection with Private Record Alignment.
For example, in data sharing between a hospital (party B, the Active party) and a medical imaging centre (party A, the Passive party), only a subset of the hospital patients will exist in the imaging centre's data. The hospital can run a PRL session to determine the target subset for model training.
PRL Session Overview
In PRL, two parties submit paths to their datasets so that they can be aligned to perform a machine learning task.
- ID columns (id_columns) are used to produce a hash that is sent to the server for comparison. The secret for this hash is shared between the clients and the server has no knowledge of it. This comparison is the Private Set Intersection (PSI) part of PRL.
- Once compared, the server orchestrates the data alignment because it knows which indices of each dataset are in common. This is the Private Record Alignment (PRA) part of PRL.
For information about privacy when performing PRL, see PRL Privacy for VFL.
PRL Session Example
Use the integrateai_batch_client_prl.ipynb
notebook to follow along and test the examples shown below by filling in your own variables as required.
This example uses AWS Fargate and Batch to run the session.
- Complete the Environment Setup.
- Ensure that you have the correct roles and policies for Fargate and Batch. See Running AWS Batch jobs with the SDK and Running a training server on AWS Fargate for details.
- Authenticate to the integrate.ai API client.
Create a configuration for the PRL session.
# Example data config
prl_data_config = {
"clients": {
"passive_client": {"id_columns": ["id"]},
"active_client": {"id_columns": ["id"],},
}
}
Specify a prl_data_config
that indicates the columns to use as identifiers when linking the datasets to each other. The number of items in the config specifies the number of expected clients. In this example, there are two items and therefore two clients submitting data. Their datasets are linked by the "id" column in any provided datasets.
Create the session
# Create session example
prl_session = client.create_prl_session(
name="Testing notebook - PRL",
description="I am testing PRL session creation through a notebook",
data_config=prl_data_config
).start()
prl_session.id
To create the session, specify the data_config
that contains the client names and columns to use as identifiers to link the datasets. For example: prl_data_config
.
These client names are referenced for the compute on the PRL session and for any sessions that use the PRL session downstream.
Specify AWS parameters and credentials
# Example data paths in s3
active_train_oath = 's3://<path to dataset>/active_train.csv'
passive_train_path = 's3://<path to dataset>/passive_train.csv'
active_test_path = 's3://<path to dataset>/active_test.csv'
passive_test_path = 's3://<path to dataset>/passive_test.csv'
# Specify the AWS parameters
cluster = "iai-server-ecs-cluster"
task_definition = "iai-server-fargate-job"
model_storage = "s3://<path to storage>"
security_group = "<security group name>"
subnet_id = "<subnet>" # Public subnet (routed via IGW)
job_queue='iai-client-batch-job-queue'
job_def='iai-client-batch-job'
# Example of using temporary credentials
aws_creds = {
'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
'REGION': os.environ.get("AWS_REGION"),
}
- Specify the paths to the datasets and the AWS Batch job information.
- Specify your AWS credentials if you are generating temporary ones. Otherwise, use the default profile credentials.
Create a task builder and task group
from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup
# Creating task builder objects
task_server = taskbuilder_aws.fargate(
cluster=cluster,
task_definition=task_definition)
tb = taskbuilder_aws.batch(
job_queue=job_queue,
aws_credentials=aws_creds,
cpu_job_definition=job_def)
task_group_context = SessionTaskGroup(prl_session)\
.add_task(task_server.fls(subnet_id, security_group, storage_path=model_storage, client=<client>))\
.add_task(tb.prl(train_path=passive_train_path, test_path=passive_test_path, vcpus='2', memory='16384', client=client, client_name="passive_client"))\
.add_task(tb.prl(train_path=active_train_path, test_path=active_test_path, vcpus='2', memory='16384', client=client, client_name="active_client")).start()
- Set up the task builder and task group.
- Import the taskbuilder and taskgroup from the SDK.
- Specify the server and batch information to create the task builder objects.
- Create a task in the task group for each client. The number of tasks in the task group must match the number of clients specified in the
data_config
used to create the session. - Set the
train_path
,test_path
, andclient_name
for each task. Theclient_name
must be the same name as specified in thedata_config
file. - Optional: Set the
vcpus
andmemory
parameters are to override the Batch job definition.
Monitor submitted jobs
Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK. The following code returns the session ID that is included in the job name.
# session available in group context after submission
print(task_group_context.session.id)
Next, you can check the status of the tasks.
# status of tasks submitted
task_group_status = task_group_context.status()
for task_status in task_group_status:
print(task_status)
Submitted tasks are in the pending state until the clients join and the session is started. Once started, the status changes to running.
# Use to monitor if a session has completed successfully or has failed
# You can modify the time to wait as per your specific task
task_group_context.wait(300)
View the overlap statistics
When the session is complete, you can view the overlap statistics for the datasets.
prl_session.metrics().as_dict()
Example result:
{'session_id': '07d0f8358d',
'federated_metrics': [],
'client_metrics': {'passive_client': {'train': {'n_records': 14400,
'n_overlapped_records': 12963,
'frac_overlapped': 0.9},
'test': {'n_records': 3600,
'n_overlapped_records': 3245,
'frac_overlapped': 0.9}},
'active_client': {'train': {'n_records': 14400,
'n_overlapped_records': 12963,
'frac_overlapped': 0.9},
'test': {'n_records': 3600,
'n_overlapped_records': 3245,
'frac_overlapped': 0.9}}}}
To run a VFL training session on the linked datasets, see VFL FFNet Model Training).
To perform exploratory data analysis on the intersection, see EDA Intersect.
VFL SplitNN Model Training
In a vertical federated learning (VFL) process, two or more parties collaboratively train a model using datasets that share a set of overlapping features. Each party has partial information about the overlapped subjects in the dataset. Therefore, before running a VFL training session, a private record linkage (PRL) session is performed to find the intersection and create alignment between datasets.
There are two types of parties participating in the training:
- The Active Party owns the labels, and may or may not also contribute data.
- The Passive Party contributes only data.
For example, in data sharing between a hospital (party B, the Active party) and a medical imaging centre (party A, the Passive party), only a subset of the hospital patients will exist in the imaging centre's data. The hospital can run a PRL session to determine the target subset for VFL model training.
VFL Session Overview
A hospital may have patient blood tests and outcome information on cancer, but imaging data is owned by an imaging centre. They want to collaboratively train a model for cancer diagnosis based on the imaging data and blood test data. The hospital (active party) would own the outcome and patient blood tests and the Imaging Centre (passive party) would own the imaging data.
A simplified model of the process is shown below.
integrate.ai VFL Flow
The following diagram outlines the training flow in the integrate.ai implementation of VFL.
VFL Training Session Example
Use the integrateai_fargate_batch_client_vfl.ipynb
notebook to follow along and test the examples shown below by filling in your own variables as required. You can download sample notebooks here.
The notebook demonstrates both the PRL session and the VFL train and predict sessions.
Note: This example uses AWS Fargate and Batch to run the session.
Complete the Environment Setup
model_config = {
"strategy": {"name": "SplitNN", "params": {}},
"model": {
"feature_models": {
"passive_client": {"params": {"input_size": 7, "hidden_layer_sizes": [6], "output_size": 5}},
"active_client": {"params": {"input_size": 8, "hidden_layer_sizes": [6], "output_size": 5}},
},
"label_model": {"params": {"hidden_layer_sizes": [5], "output_size": 2}},
},
"ml_task": {
"type": "classification",
"params": {
"loss_weights": None,
},
},
"optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
"seed": 23, # for reproducibility
}
data_config = {
"passive_client": {
"label_client": False,
"predictors": ["x1", "x3", "x5", "x7", "x9", "x11", "x13"],
"target": None,
},
"active_client": {
"label_client": True,
"predictors": ["x0", "x2", "x4", "x6", "x8", "x10", "x12", "x14"],
"target": "y",
},
}
- Ensure that you have the correct roles and policies for Fargate and Batch. See AWS Batch and Fargate Manual Setup for details.
- Authenticate to the integrate.ai API client.
- Run a PRL session to obtain the aligned dataset. This session ID is required for the VFL training session.
- Create a
model_config
and adata_config
for the VFL session.
Parameters:
strategy
: Specify the name and parameters. For VFL, the strategy is SplitNN.model
: Specify thefeature_models
andlabel_model
.feature_models
refers to the part of the model that transforms the raw input features into intermediate encoded columns (usually hosted by both parties).label_model
refers to the part of the model that connects the intermediate encoded columns to the target variable (usually hosted by the active party).
ml_task
: Specify the type of machine learning task, and any associated parameters. Options areclassification
orregression
.optimizer
: Specify any optimizer supported by PyTorch.seed
: Specify a number.
Create and start a VFL training session
vfl_train_session = client.create_vfl_session(
name="Testing notebook - VFL Train",
description="I am testing VFL Train session creation through a notebook",
prl_session_id=prl_session.id,
vfl_mode='train',
min_num_clients=2,
num_rounds=5,
package_name="iai_ffnet",
data_config=data_config,
model_config=model_config,
startup_mode="external"
).start()
vfl_train_session.id
Specify the PRL session ID and ensure that the vfl_mode
is set to train
.
Set up the task builder and task group for VFL training
vfl_task_group_context = SessionTaskGroup(vfl_train_session)\
.add_task(task_server.fls(subnet_id, security_group, storage_path=model_storage, client=client))\
.add_task(tb.vfl_train(train_path=passive_train_path, test_path=passive_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path=model_storage, client=client, client_name="passive_client"))\
.add_task(tb.vfl_train(train_path=active_train_path, test_path=active_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path=model_storage, client=client, client_name="active_client")).start()
In the sample notebook, the server and client task builders are set up in the PRL session workflow, so you only have to create the VFL task group.
If you are not using the notebook, ensure that you import the required packages from the SDK. Create a task in the task group for the server, and for each client. The number of client tasks in the task group must match the number of clients specified in the data_config
used to create the session.
The following parameters are required for each client task:
train_path
test_path
batch_size
storage_path
client_name
The vcpus
and memory
parameters are optional overrides for the job definition.
Monitor submitted VFL training jobs
# session available in group context after submission
print(vfl_task_group_context.session.id)
# status of tasks submitted
vfl_task_group_status = vfl_task_group_context.status()
for task_status in vfl_task_group_status:
print(task_status)
# Use to monitor if a session has completed successfully or has failed
# You can modify the time to wait as per your specific task
vfl_task_group_context.wait(300)
Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK.
View the VFL training metrics
# Example results
{'session_id': '498beb7e6a',
'federated_metrics': [{'loss': 0.6927943530912943},
{'loss': 0.6925891094472265},
{'loss': 0.6921983339753467},
{'loss': 0.6920029462394067},
{'loss': 0.6915351291650617}],
'client_metrics': [{'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_roc_auc': 0.5286237121001411,
'test_num_examples': 3245,
'test_loss': 0.6927943530912943,
'test_accuracy': 0.5010785824345146}},
{'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_num_examples': 3245,
'test_accuracy': 0.537442218798151,
'test_roc_auc': 0.5730010669487545,
'test_loss': 0.6925891094472265}},
{'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_accuracy': 0.550693374422188,
'test_roc_auc': 0.6073282812853845,
'test_loss': 0.6921983339753467,
'test_num_examples': 3245}},
{'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_loss': 0.6920029462394067,
'test_roc_auc': 0.6330078151716465,
'test_accuracy': 0.5106317411402157,
'test_num_examples': 3245}},
{'user@integrate.ai:79704ac8c1a7416aa381288cbab16e6a': {'test_roc_auc': 0.6495852274713467,
'test_loss': 0.6915351291650617,
'test_accuracy': 0.5232665639445301,
'test_num_examples': 3245}}]}
Once the session completes successfully, you can view the training metrics.
vfl_train_session.metrics().as_dict()
Plot the VFL training metrics.
fig = vfl_train_session.metrics().plot()
Example of plotted training metrics
VFL Prediction Session Example
This example continues the workflow in the previous sections: PRL Session Example and VFL Training Session Example.
Create and start a VFL prediction session
# Example configuration of a VFL predict session
vfl_predict_session = client.create_vfl_session(
name="Testing notebook - VFL Predict",
description="I am testing VFL Predict session creation through a notebook",
prl_session_id=prl_session.id,
training_session_id=vfl_train_session.id,
vfl_mode='predict',
data_config=data_config
).start()
vfl_predict_session.id
To create a VFL prediction session, specify the PRL session ID (prl_session_id
) and the VFL train session ID (training_session_id
) from your previous succesful PRL and VFL sessions.
Set the vfl_mode
to predict
.
Specify the full path for the storage location for your predictions, including the file name.
active_predictions_storage_path="<full path of predictions file name>"
Create and start a task group for the prediction session
# Example of setting up a task group
vfl_predict_task_group_context = SessionTaskGroup(vfl_predict_session)\
.add_task(task_server.fls(subnet_id, security_group, storage_path=model_storage, client=client))\
.add_task(tb.vfl_predict(client_name='active_client', dataset_path=active_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path=active_predictions_storage_path, client=client, raw_output=True))\
.add_task(tb.vfl_predict(client_name='passive_client', dataset_path=passive_test_path, vcpus='2', memory='16384', batch_size=1024, storage_path="passive_no_path", client=client, raw_output=True))
.start()
Monitor submitted VFL prediction jobs
# Example of monitoring tasks
print(vfl_predict_task_group_context.session.id)
vfl_predict_task_group_status = vfl_predict_task_group_context.status()
for task_status in vfl_predict_task_group_status:
print(task_status)
vfl_predict_task_group_context.wait(300)
Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK.
View VFL predictions
After the predict session completes successfully, you can view the predictions from the Active party and evaluate the performance.
import pandas as pd
df_pred = pd.read_csv(active_predictions_storage_path)
df_pred.head()
Example output:
DATA ANALYSIS
EDA in Individual Mode
The Exploratory Data Analysis (EDA) feature for horizontal federated learning (HFL) enables you to access summary statistics about a group of datasets without needing access to the data itself. This allows you to get a basic understanding of the dataset when you don't have access to the data or you are not allowed to do any computations on the data.
EDA is an important pre-step for federated modelling and a simple form of federated analytics. The feature has a built in differential privacy setting. Differential privacy (DP) is dynamically added to each histogram that is generated for each feature in a participating dataset. The added privacy protection causes slight noise in the end result.
At a high level, the process is similar to that of creating and running a session to train a model. The steps are:
- Authenticate with your access token.
- Configure an EDA session.
- Create and start the session.
- Run the session and poll for session status.
- Analyze the datasets.
The sample notebook (integrate_ai_api.ipynb) provides runnable code examples for exploring the API, including the EDA feature, and should be used in parallel with this tutorial. This documentation provides supplementary and conceptual information to expand on the code demonstration.
API Reference for EDA
The core API module that contains the EDA-specific functionality is integrate_ai_sdk.api.eda
. This module includes a core object called EdaResults
, which contains the results of an EDA session.
If you are comfortable working with the integrate.ai SDK and API, see the API Documentation for details.
This tutorial assumes that you have correctly configured your environment for working with integrate.ai, as described in Installing the SDK.
Authenticate with your Access Token
import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")
from integrate_ai_sdk.api import connect
client = connect(token=IAI_TOKEN)
Use your access token to authenticate to the API client. The SDK simplifies the authentication process by providing a connect helper module. You import the connect helper from the SDK, provide your token, and authenticate to the client.
Set an environment variable for your token (as described in Environment Setup) or replace it inline in the sample code.
Reminder: You can generate and manage tokens through the integrate.ai web portal.
Configure an EDA Session
To begin exploratory data analysis, you must first create a session, the same as you would for training a model. In this case, to configure the session, you must specify either the dataset_config
, or num_datasets
argument.
Using a dataset_config file
The dataset_config
file is a configuration file that maps the name of one or more datasets to the columns to be pulled. Dataset names and column names are specified as key-value pairs in the file.
For each pair, the keys are dataset names that are expected for the EDA analysis. The values are a list of corresponding columns. The list of columns can be specified as column names (strings), column indices (integers), or a blank list to retrieve all columns from that particular dataset.
If a dataset name is not included in the configuration file, all columns from that dataset are used by default.
For example:
To retrieve all columns for a submitted dataset named dataset_one
:
dataset_config = {"dataset_one": []}
To retrieve the first column and the column x2
for a submitted dataset named dataset_one
:
dataset_config = {"dataset_one": [1,"x2"]}
To retrieve the first column and the column x2
for a submitted dataset named dataset_one
and all columns in a dataset named dataset_two
:
dataset_config = {"dataset_one": [1,"x2"],"dataset_two": []}
Specifying the number of datasets
You can manually set the number of datasets that are expected to be submitted for an EDA session by specifying a value for num_datasets
.
If num_datasets
is not specified, the number is inferred from the number of datasets provided in dataset_config
.
Create and start an EDA session
dataset_config = {"dataset_one": [1,"x5","x7"], "dataset_two": ["x1","x10","x11"]}
eda_session = client.create_eda_session(
name="Testing notebook - EDA",
description="I am testing EDA session creation through a notebook",
data_config=dataset_config
).start()
eda_session.id
The code sample demonstrates creating and starting an EDA session to perform privacy-preserving data analysis on two datasets, named dataset_one
and dataset_two
. It returns an EDA session ID that you can use to track and reference your session.
The dataset_config
used here specifies that the first column (1
), column x5
, and column x7
will be analyzed for dataset_one
and columns x1
, x10
, and x11
will be analyzed for dataset_two
.
Since the num_datasets
argument is not provided to client.create_eda_session()
, the number of datasets is inferred as two from the dataset_config
.
For more information, see the create_eda_session()
definition in the API documentation.
Poll for session status
# Example of polling
import time
while eda_session.status == "running":
if eda_session.round is None:
print("Polling for Session info")
if eda_session.round is not None:
if eda_session.round > 0:
print("EDA session in progress.")
else:
print("EDA Session clean up")
print("-------------------------")
time.sleep(1)
You can log information about the session during this time.
Analyze the datasets
The results object is a dataset collection comprised of multiple datasets that can be retrieved by name. Each dataset is comprised of columns that can be retrieved by either column name or by index.
You can perform the same base analysis functions at the collection, dataset, or column level.
results = eda_session.results()
*Example output:
EDA Session collection of datasets: ['dataset_two', 'dataset_one']
Describe
Use the .describe()
method to retrieve a standard set of descriptive statistics.
If a statistical function is invalid for a column (for example, mean requires a continuous column and x10 is categorical), or the column from one dataset is not present in the other (for example, here x5 is in dataset_one, but not dataset_two), then the result is NaN.
results.describe()
results["dataset_one"].describe()
Statistics
For categorical columns, you can use other statistics for further exploration. For example, unique_count
, mode
, and uniques
.
results["dataset_one"][["x10", "x11"]].uniques()
You can call functions such as .mean()
, .median()
, and .std()
individually.
results["dataset_one"].mean()
results["dataset_one"]["x1"].mean()
Histograms
You can create histogram plots using the .plot_hist()
function.
saved_dataset_one_hist_plots = results["dataset_two"].plot_hist()
single_hist = results["dataset_two"]["x1"].plot_hist()
EDA in Intersect Mode
The Exploratory Data Analysis (EDA) Intersect feature for private record linkage (PRL) sessions enables you to access summary statistics about a group of datasets without needing access to the data itself. This allows you to get a basic understanding of the dataset when you don't have access to the data or you are not allowed to do any computations on the data. It enables you to understand more about the intersection between two datasets.
EDA is an important pre-step for federated modelling and a simple form of federated analytics. The feature has a built in differential privacy setting. Differential privacy (DP) is dynamically added to each histogram that is generated for each feature in a participating dataset. The added privacy protection causes slight noise in the end result.
At a high level, the process is similar to that of creating and running a session to train a model. The steps are:
- Run a PRL session to determine the intersection between your datasets.
- Configure an EDA Intersect session.
- Create and start the session.
- Run the session and poll for session status.
- Analyze the results.
Use the (integrateai_eda_intersect_batch.ipynb) notebook to follow along and test the examples shown below by filling in your own variables as required. This documentation provides supplementary and conceptual information to expand on the code demonstration.
API Reference
The core API module that contains the EDA-specific functionality is integrate_ai_sdk.api.eda
. This module includes a core object called EdaResults
, which contains the results of an EDA session. If you are comfortable working with the integrate.ai SDK and API, see the API Documentation for details.
Configure an EDA Intersect Session
eda_data_config = {"prl_sil0":[],"prl_silo1":[]}
prl_session_id = "<PRL_SESSION_ID>"
This example uses AWS Batch to run the session using data in S3 buckets. Ensure that you have completed the AWS Batch and Fargate Manual Setup and that you have the correct roles and policies for AWS Batch and Fargate. See Running AWS Batch jobs with the SDK for details.
To begin exploratory data analysis, you must first create a session, the same as you would for training a model. To configure the session, specify the following:
- EDA data configuration (eda_data_config)
- prl_session_id for the PRL session you want to work with
The eda_data_config
specifies the names of the datasets used to generate the intersection in the PRL session in the format dataset_name : columns
. If columns
is empty ([]
), then EDA is performed on all columns.
You must also specify the session ID of a successful PRL session using the datasets listed in the eda_data_config
.
Column Correlation
If you want to find the correlation (or any other binary operation) between two specific columns, you must specify those columns as paired columns to ensure that the 2D histogram is calculated for them. Otherwise, only 1D histograms are generated for the columns separately, similar to the individual mode, except that they are constructed with only the overlapping samples from the intersection.
To set which pairs you are interested in, specify their names in a dictionary like data_config
.
For example: {"passive_client": ['x1', 'x5'], "active_client": ['x0', 'x2']}
will generate 2D histograms for these pairs of columns:
(x0, x1), (x0, x5), (x2, x1), (x2, x5), (x0, x2), (x1, x5)
paired_cols = {"active_client": ['x0', 'x2'], "passive_client": ['x1']}
Create and start an EDA intersect session
eda_session = client.create_eda_session(
name="EDA Intersect Session",
description="Testing EDA Intersect mode through a notebook",
data_config=eda_data_config,
eda_mode="intersect", #Generates histograms on an overlap of two distinct datasets
prl_session_id=prl_session_id
hide_intersection=True, #Optional - specifies whether to apply CKKS encryption when generating the output. Defaults to true.
paired_columns=paired_cols #Optional - only required to generate 2D histograms
).start()
eda_session.id
The code sample demonstrates creating and starting an EDA session to perform privacy-preserving data analysis on the intersection of two distinct datasets. It returns an EDA session ID that you can use to track and reference your session.
For more information, see the create_eda_session()
definition in the API documentation.
Start an EDA Intersect session using AWS Batch
Example data paths in s3
train_path1 = 's3://sample-data.integrate.ai/prl/prl_silo0.csv'
train_path2 = 's3://sample-data.integrate.ai/prl/prl_silo1.csv'
test_path1 = 's3://sample-data.integrate.ai/prl/prl_silo0.csv'
test_path2 = 's3://sample-data.integrate.ai/prl/prl_silo1.csv'
Specify the AWS parameters
model_storage = "s3://sample-data.integrate.ai"
job_queue='iai-client-batch-job-queue'
job_def='iai-client-batch-job'
Specify your AWS credentials if you are generating temporary ones. Otherwise, use the default profile credentials.
Import the task builder and task group from the SDK.
from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup
Specify the batch information to create the task builder object.
Create the task.
tb = taskbuilder_aws.batch(
job_queue=job_queue,
aws_credentials=aws_creds,
cpu_job_definition=job_def)
Create and run the task group.
task_group_context = SessionTaskGroup(eda_session)\
.add_task(tb.eda(dataset_path=train_path1, dataset_name="prl_silo0", vcpus='2', memory='16384', client=client))\
.add_task(tb.eda(dataset_path=train_path2, dataset_name="prl_silo1", vcpus='2', memory='16384', client=client)).start()
.start()
Create a task in the task group for each client. The number of tasks in the task group must match the number of clients specified in the data_config
used to create the session.
The vcpus
and memory
parameters are optional overrides for the job definition.
Monitor submitted EDA Intersect jobs
# session available in group context after submission
print(task_group_context.session.id)
# status of tasks submitted
task_group_status = task_group_context.status()
for task_status in task_group_status:
print(task_status)
# Use to monitor if a session has completed successfully or has failed
# You can modify the time to wait as per your specific task
task_group_context.wait(300)
Each task in the task group kicks off a job in AWS Batch. You can monitor the jobs through the console or the SDK.
Submitted tasks are in the pending
state until the clients join and the session is started. Once started, the status changes to running
.
When the session completes successfully, "True" is returned. Otherwise, an error message appears.
Analyze the results
To retrieve the results of the session:
results = eda_session.results()
Example output:
EDA Session collection of datasets: ['active_client', 'passive_client']
Describe
You can use the .describe()
function to review the results.
results.describe()
Example output:
Statistics
For categorical columns, you can use other statistics for further exploration. For example, unique_count
, mode
, and uniques
.
results["active_client"][["x10", "x11"]].uniques()
Example output:
Mean
You can call functions such as .mean()
, .median()
, and .std()
individually.
results["active_client"].mean()
Example output:
Histograms
You can create histogram plots using the .plot_hist()
function.
saved_dataset_one_hist_plots = results["active_client"].plot_hist()
Example output:
single_hist = results["active_client"]["x10"].plot_hist()
Example output:
2D Histograms
You can also plot 2D-histograms of specified paired columns.
fig = results.plot_hist(active_client['x0'], passive_client['x1'])
Example output:
Correlation
You can perform binary calculations on columns specified in paired_columns
, such as finding the correlation.
results.corr(active_client['x0'], passive_client['x1'])
Example output:
Addition, subtraction, division
Addition example. Change the operator to try subtraction, division, etc.
op_res = active_client['x0']+passive_client['x1']
fig = op_res.plot_hist()
Example output:
GroupBy
groupby_result = results.groupby(active_client['x0'])[passive_client['x5']].mean()
print (groupby_result)
MANUAL AWS DEPLOYMENT AND USE
AWS Batch and Fargate Manual Setup
AWS Requirements
This section outlines the setup steps required to configure your working environment. Steps that are performed in the AWS platform are not explained in detail. Refer to the AWS documentation as needed.
The requirements are tool-agnostic - that is, you can complete the steps through the AWS console, or through a tool such as Terraform or AWS CloudFormation.
AWS Authentication
aws ssm put-parameter --name iai-token --value <IAI_TOKEN> --type SecureString
Complete the steps to create an IAI access token, hereafter referred to as
On the AWS CLI, set the token as a parameter for your SSM agent. SSM handles getting and using the token as needed for the batch session.
# Example response
{
"Version": 1,
"Tier": "Standard"
}
About "secrets"
In order for the batch job to access the integrate.ai JWT through SSM it needs to be set in the job definition secrets configuration.
Set the name
of the secret to IAI_TOKEN
to create the IAI_TOKEN
environment variable that the docker client uses to authenticate with the session.
The valueFrom
is the SSM key that contains the integrate.ai access token. If you are running the batch job in the same region that the SSM parameter was created in, pass in the name of the SSM parameter. If the SSM parameter is in a different region, pass in the SSM parameter ARN.
To have different tokens for different user groups, you must have a different batch job definition for each token.
Create an AWS KMS Key
If you do not already have one, define a symmetric key for encryption and decryption in AWS KMS. The IAI server uses the ID of your key to retrieve the IAI access token through SSM.
- In the KMS console, click Create a Key.
- Keep the default settings for Symmetric and Encrypt and decrypt. Click Next.
- Type an Alias for the key. Click Next.
- Select one or more Key administrators and click Next.
- Select one or more IAM Users and/or Roles to grant access to the key. Note: The SDK User IAM Role must have GenerateDataKey and Encrypt permissions. The Fargate Task and/or Batch job roles must have Decrypt permission.
- Click Next, then click Finish.
Make a note of the Key ID. This parameter is required as part of the training session definition.
Set up integrate.ai components
# Create a repository for the client
aws ecr create-repository --repository-name iai_client
# Create a repository for the server
aws ecr create-repository --repository-name iai_fl_server
# Log in and upload the Docker images
aws ecr get-login-password --region <AWS_REGION> | docker login --username AWS --password-stdin <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com
# Tag and push the client image
docker tag 919740802015.dkr.ecr.<AWS_REGION>.amazonaws.com/edge/fl-client:<version> <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_client:<version>
docker push 919740802015.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_client:<version>
#Tag and push the server image
docker tag 919740802015.dkr.ecr.<AWS_REGION>.amazonaws.com/edge/iai_fl_server:<version> <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_fl_server:<version>
docker push <AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_fl_server:<version>
Install the integrate.ai CLI tool, the SDK, and the client. See Installing the SDK for details.
Push the IAI client Docker image to an AWS ECR repository. See the AWS ECR documentation for detailed instructions for setting up ECR, then upload the integrate.ai client Docker image.
Note: The client and server versions change as updates are released. Make sure that you are always uploading the latest version by specifying the correct version number.
Set up the dataset
aws s3 cp </path/to/data> s3://<your.aws.data.bucket> --recursive
Create one or more S3 buckets to contain the dataset(s). The URL for the dataset is a required parameter for the SDK.
If necessary, copy the data to your S3 bucket, as shown in the example.
Note: Only S3 is supported.
Create AWS Batch Roles and Policies
This guide describes in brief how to create and manage roles and policies through the AWS IAM service console. The JSON configuration is also provided for those using Terraform or other tools.
You can create roles from the Roles link under Access Management in IAM.
Note: For the sample code that follows, replace any variable placeholders (such as
EC2 Instance Role
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "batch.amazonaws.com"
}
}
]
}
arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
- Click Create Role.
- Select AWS Service.
- Select EC2.
- Click Next.
- In the Permissions policies search box, type
AmazonEC2ContainerServiceforEC2
. This is the ARN. - Select this policy and click Next.
- Provide a meaningful name for the role.
- Review the configuration and compare it to the example below.
- Click Create role.
Batch Service Role
- Create an associated instance profile and batch service role.
- Click Create Role.
- Select AWS Service.
- Select Batch from the drop-down list, then select Batch.
- Click Next.
- On the Permissions policies page, click Next.
- Provide a meaningful name for the role.
- Review the configuration and compare it to the example below.
- Click Create role.
ECS Task Role (Job Role) with CloudWatch and S3 policies
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowLogGroupAndStreamAccess",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:DescribeLogStreams",
"logs:PutLogEvents",
"logs:CreateLogGroup"
],
"Resource": [
"arn:aws:logs:<AWS_REGION>:<AWS_ACCOUNT>:log-group:/iai-client/batch:log-stream:*"
]
}
]
}
This role requires access to the S3 bucket containing your data. It requires the ECS task role policy and an S3 access policy.
- Click Create Role.
- Select AWS Service.
- Select Elastic Container Service from the drop-down list.
- Select Elastic Container Service Task.
- Click Next.
- On the Add Permissions page, click Create policy. This opens a Policies page in a second browser window to allow you to create policies and return to your role to attach them.
- On the Create policy page, select the JSON tab and paste the following, with your
<AWS region>
and<AWS account>
filled in. - Click Next: Tags, then click Next: Review.
- Provide a meaningful name for the policy.
- Click Create policy.
Add an S3 policy.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3BucketReadAccess",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetEncryptionConfiguration",
"s3:GetObject",
"s3:GetObjectAcl",
"s3:ListBucketVersions"
],
"Resource": [
"arn:aws:s3:::<your.aws.data.bucket>",
"arn:aws:s3:::<your.aws.data.bucket>/*"
]
}
]
}
- Click Create policy.
- On the Create policy page, select the JSON tab and paste the example, with the name of
<your.aws.data.bucket>
filled in.
The S3 policy on the ECS task role restricts the Job Definition to the S3 buckets that are referenced. To restrict data access further, consider creating different Job Definitions that access different S3 buckets.
- Click Next: Tags, then click Next: Review.
- Provide a meaningful name for the policy.
- Click Create policy.
Add policies to the role
- Return to the Add permissions page in the previous browser window.
- Use the Permissions policies search box to search for and select the names of the two policies you just created.
- Click Next.
- Provide a meaningful name for the role, such as ecs-task-role.
- Click Create role.
ECS Execution Role with ECR, CloudWatch, and SSM policies
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
This role gives the batch job access to ECR and SSM. It requires the ECS task role policy and ECR, CloudWatch, and SSM policies.
- Click Create Role.
- Select AWS Service.
- Select Elastic Container Service from the drop-down list.
- Select Elastic Container Service Task.
- Click Next.
- In the Permissions policies search box, type
AmazonECSTaskExecutionRolePolicy
. - Select this policy, then click Next.
- Provide a meaningful name for the role, such as
ecs-execution-role
. - Scroll down to Step 2: Add permissions and click Edit.
- Click Create policy.
- On the Create policy page, select the JSON tab and paste the example.
- Click Next: Tags and then click Next: Review.
- Provide a meaningful name for the policy.
- Click Create policy.
- In the Create role browser window, in the Permissions policies search box, type the name of the policy you just created. Note: you may have to click the Refresh button beside the Create policy button to make the new policy appear.
- Select the policy.
- Click Next.
- Repeat the process above to add an SSM policy. Fill in your
<AWS region>
and<AWS account>
information.
# ECS Execution Role SSM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SSMAccessForIAIToken",
"Effect": "Allow",
"Action": [
"ssm:DescribeParameters",
"ssm:GetParameter",
"ssm:GetParameters"
],
"Resource": [
"arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/iai-token"
]
}
]
}
- Add the policy to your ECS Execution role.
- Click Next.
- Click Create Role.
SDK User Role
# Example IAM policy for SDK User Role
{
"Statement": [
{
"Action": [
"batch:SubmitJob"
],
"Effect": "Allow",
"Resource": [
"arn:aws:batch:<AWS_REGION>:<AWS_ACCOUNT>:job-queue/iai-client-batch-job-queue",
"arn:aws:batch:<AWS_REGION>:<AWS_ACCOUNT>:job-queue/iai-client-batch-job-queue/*",
"arn:aws:batch:<AWS_REGION>:<AWS_ACCOUNT>:job-definition/iai-client-batch-job"
],
"Sid": "AllowUserBatchJobSubmission"
},
{
"Action": [
"batch:DescribeJobs"
],
"Effect": "Allow",
"Resource": [
"*"
],
"Sid": "AllowUserDescribeBatch"
},
{
"Action": [
"ssm:DescribeParameters",
"ssm:GetParameter"
],
"Effect": "Allow",
"Resource": [
"arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/iai-token"
],
"Sid": "SSMAccessForIAIToken"
}
],
"Version": "2012-10-17"
}
The role and permissions required to set up and configure the environment are different than the permissions required to start an integrate.ai client batch job using the integrate.ai SDK.
The end user of the SDK requires, at minimum, permission to submit a batch job using the job queue and job definition created earlier, and permission to describe jobs.
Any additional permissions required depend on your application. For example, you may give the user the ability to access the SSM parameter set earlier, so that they can pull the integrate.ai access token and use it to manage sessions with the integrate.ai SDK.
Create Fargate roles and policies
Fargate Execution Role
# Fargate Execution role
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "ecs-tasks.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Create an execution role to allow the batch job to access ECR and SSM.
In the IAM console, create an AWS Service role. Use the drop-down menu to select Elastic Container Service as the Use case, and select Elastic Container Service Task.
Under Permissions policies, select AmazonECSTaskExecutionRolePolicy
.
Create a custom policy using the JSON below and add it to the execution role.
# Fargate Execution role SSM policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SSMAccessForTokens",
"Effect": "Allow",
"Action": [
"ssm:DescribeParameters",
"ssm:GetParameter",
"ssm:GetParameters"
],
"Resource": [
"arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/*-token"
]
}
]
}
Note: The SSM policy uses a wildcard (*) in the token name to allow for flexible token use for this task.
Fargate Task Role
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "ecs-tasks.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
This task role enables Fargate to access CloudWatch for logging, and the VPC.
In the IAM console, create an AWS Service role. Use the drop-down menu to select Elastic Container Service as the Use case, and select Elastic Container Service Task.
This role requires policies for four services: CloudWatch, VPC, S3, and SSM.
Create a policy with the following JSON for CloudWatch.
Fargate Task role CloudWatch policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowLogGroupAndStreamAccess",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:DescribeLogStreams",
"logs:PutLogEvents",
"logs:CreateLogGroup"
],
"Resource": [
"arn:aws:logs:<AWS_REGION>:<AWS_ACCOUNT>:log-group:/ecs/fl-server-fargate:*"
]
}
]
}
Fargate Task role VPC policy
The VPC policy enables the IAI server to retrieve public IPs for service discovery registration.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowVPC",
"Effect": "Allow",
"Action": [
"ec2:DescribeNetworkInterfaces"
],
"Resource": ["*"]
}
]
}
Fargate Task role S3 policy
The Fargate Task role uses the S3 policy to store models in S3 after they have been trained.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3BucketAccess",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetEncryptionConfiguration",
"s3:GetObjectAcl",
"s3:ListBucketVersions",
"s3:*Object"
],
"Resource": [
"arn:aws:s3:::<path to your data>",
"arn:aws:s3:::<path to your data>/*"
]
}
]
}
Fargate Task role SSM policy
The SSM policy allows access to required tokens.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SSMAccessForTokens",
"Effect": "Allow",
"Action": [
"ssm:DescribeParameters",
"ssm:GetParameter",
"ssm:GetParameters"
],
"Resource": [
"arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/*-token"
]
}
]
}
Note: The SSM policy uses a wildcard (*) in the token name to allow for flexible token use for this task.
SDK User IAM Role
The SDK user requires permission to describe, start, and run an ECS Fargate task using the job definition provided. Any further permissions granted to the user depends on your application.
{
"Statement": [
{
"Action" : [
"ecs:DescribeContainerInstances",
"ecs:DescribeTasks",
"ecs:DescribeTaskDefinition",
"ecs:ListTasks",
"ecs:UpdateContainerAgent",
"ecs:StartTask",
"ecs:StopTask",
"ecs:RunTask"
],
"Effect" : "Allow",
"Resource": ["arn:aws:ecs:<AWS_REGION>:<AWS_ACCOUNT>:cluster/iai-server-ecs-cluster",
"arn:aws:ecs:<AWS_REGION>:<AWS_ACCOUNT>:task-definition/iai-server-fargate-job:*"]
}, {
"Sid" : "SSMAccessForTokens",
"Effect" : "Allow",
"Action" : [
"ssm:DescribeParameters",
"ssm:GetParameter",
"ssm:PutParameter
],
"Resource" : [
"arn:aws:ssm:<AWS_REGION>:<AWS_ACCOUNT>:parameter/*-token"
]
}, {
"Sid": "PassRole",
"Effect": "Allow",
"Action": [
"iam:PassRole"
],
"Resource": [
<fargate_task_role.arn>,
<fargate_execution_role.arn>,
]
}
],
"Version": "2012-10-17"
}
Role and policy configuration is now complete.
Continue on to set up the AWS Batch job.
Create an AWS Compute Environment
Create a Compute environment for the batch job. For detailed instructions, see the AWS documentation. Configurations specific to the integrate.ai environment are described here.
The name of this environment is a required parameter for the SDK.
We recommend the following defaults for the compute environment.
min_vcpus: 4
max_vcpus: 256
instance_type: ["c4.4xlarge", "m4.2xlarge", "r4.2xlarge"]
type: "EC2"
When using Batch with AWS Fargate, add egress to port 9999 from port 443.
Create an AWS Batch Job Queue
name: "<iai-client-batch-job-queue>"
state: "ENABLED"
priority: 1
Create a Job queue. The name of this queue is a required parameter for the SDK.
The queue must be associated with the compute environment that you created or configured earlier.
There are no additional roles or policies required for the queue.
Create a Batch job definition
{
"image": "<AWS_ACCOUNT>.dkr.ecr.<AWS_REGION>.amazonaws.com/iai_client:<version>",
"vcpus": 1,
"memory": 60000,
"command": [
"hfl",
"Ref::task",
"--token",
"Ref::token",
"--session-id",
"Ref::sessionId",
"--train-path",
"Ref::trainPath",
"--test-path",
"Ref::testPath",
"--batch-size",
"Ref::batchSize",
"--instruction-polling-time",
"Ref::pollingTime",
"--log-interval",
"Ref::logInterval",
"--approve-custom-package"
],
"parameters": { "task": "train" },
"jobRoleArn": "<ECS_JOB_ROLE_ARN>",
"executionRoleArn": "<ECS_EXECUTION_ROLE_ARN>"
"volumes": [],
"mountPoints": [],
"ulimits": [
{
"name": "nofile",
"hardLimit": 10240,
"softLimit": 10240
}
],
"secrets": [{
"name": "IAI_TOKEN",
"valueFrom": "<iai-token>"
}],
"resourceRequirements": []
}
Create a Batch Job definition. The name of this definition is a required parameter for the SDK.
In the definition, you must specify the following:
image
- this is the integrate.ai client Docker image you uploaded to ECR<AWS Account>
and<AWS Region>
<version>
- the IAI client versionjobRoleArn
- the ECS Task Role you createdexecutionRoleArn
- the ECS Execution Role you created
Set up the Fargate environment
You can configure an AWS Fargate environment through the console UI, or through tools such as Terraform. The required components are an ECS cluster, security and log groups, and a job definition.
ECS cluster
resource "aws_ecs_cluster" "iai_server_ecs_cluster" {
name = "iai-server-ecs-cluster"
tags = {
Name = "IAI Server ECS Cluster"
}
}
There are no integrate.ai-specific settings required for the ECS cluster. You can use the default configuration.
EC2 Security Group
resource "aws_security_group" "iai_server_security_group" {
name = "iai_server_security_group"
vpc_id = <VPC_ID>
description = "Security group for IAI server ECS cluster"
tags = {
Name = "IAI Server Security Group"
}
ingress {
description = "Allow to accept HTTP requests"
from_port = 9999
to_port = 9999
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
description = "Allow nodes to access external APIs"
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}
The security group manages traffic for the Fargate server resources.
Cloudwatch Log Group
resource "aws_cloudwatch_log_group" "iai_server_fargate_log_group" {
name = "iai-server-fargate-log-group"
tags = {
Name = "IAI Server Log Group"
}
}
The log group captures and displays the output from task execution.
Fargate Job Definition
{
"family": "iai-server-fargate-job",
"type": "container",
"network_mode": "awsvpc",
"requires_compatibilities": ["FARGATE"],
"cpu": 1024,
"memory": 2048,
#Specify the Fargate Execution role
"execution_role_arn": "<ECS_EXECUTION_ROLE_ARN>",
#Specify the Fargate Task role
"task_role_arn": "<ECS_TASK_ROLE_ARN>",
"runtime_platform": {
"operating_system_family": "LINUX"
},
"placement_constraints": [],
"container_definitions": [{
"command": [],
"cpu": 0,
"disableNetworking": false,
"entryPoint": [],
"environment": [],
"essential": true,
#Specify the IAI server Docker image. Make sure you update this when the server version updates
"image": "<ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/iai_fl_server:<VERSION>",
"logConfiguration": {
"logDriver": "awslogs",
"options": {
#Specify the Fargate CloudWatch log group
"awslogs-group": "<LOG_GROUP_ID>",
"awslogs-region": "<AWS_REGION>",
"awslogs-stream-prefix": "ecs"
}
},
"mountPoints": [],
"name": "iai-server-fargate",
"portMappings": [
{
"containerPort": 9999,
"hostPort": 9999,
"protocol": "tcp"
}
],
"secrets": [{
"name": "IAI_TOKEN",
"valueFrom": "iai-token"
}],
"volumesFrom": []
}]
}
The job definition contains additional configuration.
This completes the configuration and setup for Fargate, Batch, and the integrate.ai components, as well as the roles, policies, and secrets required to run the server and client.
Running AWS Batch jobs with the SDK
AWS Batch provides convenient, scalable, machine learning computing that can be integrated with the SDK.
This tutorial assumes that you have configured the AWS Batch requirements, such as roles and permissions, as described in AWS Batch Manual Setup and that you have installed the SDK.
Use the integrateai_batch_client.ipynb
notebook to follow along and test the examples shown below by filling in your own variables as required.
Federated learning models are trained through sessions. You define the parameters required to train a federated model, including data and model configurations, in a session. Additional session parameters are required when using AWS Batch.
Specify Batch parameters
# Training and test data paths
train_path1 = "s3://{path to training data}"
train_path2 = "s3://{path to training data}"
test_path = "s3://{path to test data}"
In addition to the session definition, there are AWS Batch-specific parameters required by the SDK to run a batch job.
Specify the path(s) to your training and test data on S3.
# Specify batch environment settings
job_queue='<aws batch job queue name>'
job_def='<aws batch job definition name>'
ssm_token='<name of iai token stored on SSM>'
Specify the name of the job_queue, job_definition, and ssm_token name that you created in Create an AWS Batch Job Queue.
AWS Authentication
# Example of setting temporary AWS credentials
aws_creds = {
'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
'SESSION_TOKEN': os.environ.get("AWS_SESSION_TOKEN"),
'REGION': os.environ.get("AWS_REGION"),
}
If you are generating temporary AWS Credentials, specify them as in the example. Otherwise, use the default profile credentials, or pass in a Dict of AWS credential values.
Define a training session
# Create a training session
training_session = client.create_fl_session(
name="Testing notebook",
description="I am testing a batch job through a notebook",
min_num_clients=2,
num_rounds=2,
package_name="iai_ffnet",
model_config=<model_config>,
data_config=<data_schema>,
)
Prepare your model configuration and data schema. See Model Packages for information on the models available out-of-the-box in integrate.ai, or see Building a Custom Model for information on building your own model.
Define your training session as usual. The session definition is passed to the batch job through the task group that contains the tasks for the batch.
Specify the model_config
and data_config
names for the configuration and schema that you want to use.
Running Batch with a task group and taskbuilder
# Import the required functions.
from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup
Instead of running the integrate.ai client directly, import and use the taskgroup and taskbuilder functions. For each task, create a task object. Use the taskgroup to add each task to the batch.
One task is equivalent to one client in integrate.ai terms. The min_num_clients
given in the training session definition must match the number of tasks defined in the batch.
# Create a taskbuilder object, and provide the required parameters.
tb = taskbuilder_aws.batch(ssm_token_parameter_key=ssm_token,
job_queue=<job_queue>,
aws_credentials=<aws_creds>,
cpu_job_definition=<job_def>)
Create a task group and start the batch
task_group_context = SessionTaskGroup(training_session)\
.add_task(tb.hfl
(train_path=train_path1,
test_path=test_path,
session=training_session,
vcpus='2',
memory='16384')
)\
.add_task(tb.hfl
(train_path=train_path2,
test_path=test_path,
session=training_session,
vcpus='2',
memory='16384')
).start()
The task group defines the training_session and the tasks to run for the batch. The example code snippet creates the task group and starts the job.
The vcpu and memory parameters are optional. Use them to adjust the values in the job definition if necessary.
Monitor sumbitted Batch jobs
The task group context contains the session ID.
# Print the session ID
print(task_group_context.session_id)
# Monitor the status
task_group_status = task_group_context.status()
for task_status in task_group_status:
print(task_status)
# Wait for session completion
task_group_conext.wait(300)
You can also review the status of the jobs in the AWS Console.
Running a training server on AWS Fargate
If this is your first time running a training session with a server in AWS Fargate, configure the Fargate environment, as described in AWS Batch and Fargate Manual Setup.
To follow along with this tutorial, open the integrateai_fargate_server.ipynb
notebook.
Model config and data schema
# Example model configuration
model_config = {
"experiment_name": "test_synthetic_tabular",
"experiment_description": "test_synthetic_tabular",
"strategy": {"name": "FedAvg", "params": {}},
"model": {"params": {"input_size": 15, "hidden_layer_sizes": [6, 6, 6], "output_size": 2}},
"balance_train_datasets": False,
"ml_task": {
"type": "classification",
"params": {
"loss_weights": None,
},
},
"optimizer": {"name": "SGD", "params": {"learning_rate": 0.2, "momentum": 0.0}},
"differential_privacy_params": {"epsilon": 4, "max_grad_norm": 7},
"save_best_model": {
"metric": "loss", # to disable this and save model from the last round, set to None
"mode": "min",
},
"seed": 23, # for reproducibility
}
Prepare your model configuration and data schema. See Model Packages for information on the models available out-of-the-box in integrate.ai, or see Building a Custom Model for information on building your own model. A generic example that matches the sample notebook is provided below.
# Example data schema
data_schema = {
"predictors": ["x0", "x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "x11", "x12", "x13", "x14"],
"target": "y",
}
Create a training session
training_session = client.create_fl_session(
name="Fargate Testing notebook",
description="I am testing session creation with Fargate through a notebook",
min_num_clients=2,
num_rounds=2,
package_name="iai_ffnet",
model_config=model_config,
data_config=data_schema,
).start()
training_session.id # Print the training session ID for reference
This example session uses 2 clients and 2 rounds. The training_session definition is passed to the server as part of the task definition.
Note: If you are using a custom model, ensure that you specify the correct model_config
and data_schema
names.
Specifying optional AWS Credentials
If you are generating temporary AWS credentials, specify them here. Otherwise use the default profile credentials.
# Example of specifying temporary AWS credentials
aws_creds = {
'ACCESS_KEY': os.environ.get("AWS_ACCESS_KEY_ID"),
'SECRET_KEY': os.environ.get("AWS_SECRET_ACCESS_KEY"),
'SESSION_TOKEN': os.environ.get("AWS_SESSION_TOKEN"),
'REGION': os.environ.get("AWS_REGION"),
}
Specify the Fargate Cluster, Task Definition Name and Network Parameters
# Specify the name of your cluster, task definition and network parameters
fargate_cluster = '<fargate_cluster_name>'
task_def = '<fargate_task_definition>'
subnet_id = '<vpc_network_subnet_id>'
security_group = '<iai_server_security_group>'
train_path1 = '{train path 1}'
train_path2 = '{train path 2}'
test_path = '{test path}'
job_queue= '<fargate-job-queue>'
job_def='<iai-server-fargate-job>'
model_storage='{model storage path}'
Ensure that you have configured the cluster, task definition, and network parameters on AWS first, then specify them as variables for the SDK.
With the credentials and variables defined, you can now use the SDK to run the training server on AWS Fargate.
Run the training server
# Create a Fargate task builder object
from integrate_ai_sdk.taskgroup.taskbuilder import aws as taskbuilder_aws
tb = taskbuilder_aws.fargate(
aws_credentials=<aws_creds>,
cluster=<fargate_cluster>,
task_definition=<task_def>)
The SDK provides a taskgroup and taskbuilder object to simplify the process of creating and managing Fargate and AWS Batch tasks.
# Create an AWS Batch task builder object
tb_batch = taskbuilder_aws.batch(
aws_credentials=<aws_creds>,
job_queue=<job_queue>,
cpu_job_definition=<job_def>)
Create a taskgroup
from integrate_ai_sdk.taskgroup.base import SessionTaskGroup
task_group_context = SessionTaskGroup(training_session)
.add_task(tb.fls(subnet_id, security_group, ssm_token_key, storage_path=model_storage))
.add_task(tb_batch.hfl(train_path=train_path1, test_path=test_path, vcpus='2', memory='16384'))
.add_task(tb_batch.hfl(train_path=train_path2, test_path=test_path, vcpus='2', memory='16384')
).start()
The taskgroup starts the server and the batch.
Here we are creating a session task group that takes as input the training_session created earlier. The first task added (tb) starts the server. The tb_batch task is added twice - once for each client.
Tip: See Create a training session to review the session definition.
You can monitor the running server to check training progress.
task_group_context.wait(300)
REFERENCE
Setting up a Docker GPU Environment (Optional)
Set up a GPU environment if you are building a model with image or video data.
Prerequisites
- Ensure your OS has one or more CUDA-capable GPUs.
- The integrate.ai client uses PyTorch with CUDA 11.3. The minimum requirement is CUDA 11.3, but it is recommended that you install the latest driver available. For more information, see: CUDA major component versions.
Linux/MacOS Setup
- Install CUDA driver or CUDA toolkit:
Install cuda toolkit (which includes the driver, but also contains other unnecessary components):
- If using
rpm/deb
package manager, you can install the driver only by usingsudo apt-get/yum -y install cuda-drivers
- If using
Install CUDA driver only:
- If it's a data centre GPU see: https://docs.nvidia.com/datacenter/tesla/tesla-installation-notes/index.html
- For the other types of graphic cards, go to the Nvidia driver page, select the corresponding graphic card and operating system, download and install the driver.
- A system reboot may be required to load the driver.
- Install NVidia container toolkit:
- Follow this guide
- Ensure nvidia-docker2 can modify the docker configuration file /etc/docker/daemon.json
Windows Setup
- Ensure that intel VT-x or AMD SVM is enabled in BIOS, check the motherboard manufacture document for exact steps.
- Install CUDA driver or CUDA toolkit:
- Install cuda toolkit (which include driver, but also contains other unnecessary components)
- In the component selection screen, you can choose to install only the CUDA driver
- Install CUDA driver only
- Go to Nvidia driver page, select the corresponding graphic card and operating system, download and install the driver.
- Setup Docker with WSL2.
Running Docker container with GPU device
Add --gpus all
option to the docker run
command.
Example: docker run --gpus all -it -d --name $SILO_NAME -v <data_path>:/root/demo 919740802015.dkr.ecr.ca-central-1.amazonaws.com/edge/fl-client:latest
Model Packages
Feed Forward Neural Nets (iai_ffnet)
Feedforward neural nets are a type of neural network in which information flows the nodes in a single direction.
Examples of use cases include:
- Classification tasks like image recognition or churn conversion prediction.
- Regression tasks like forecasting revenues and expenses, or determining the relationship between drug dosage and blood pressure
HFL FFNet
The iai_ffnet model is a feedforward neural network for horizontal federated learning (HFL) that uses the same activation for each hidden layer.
This model only supports classification and regression. Custom loss functions are not supported.
Privacy
DP-SGD (differentially private stochastic gradient descent) is applied as an additional privacy-enhancing technology. The basic idea of this approach is to modify the gradients used in stochastic gradient descent (SGD), which lies at the core of almost all deep learning algorithms.
VFL SplitNN
integrate.ai also supports the SplitNN model for vertical federated learning (VFL). In this model, neural networks are trained with data across multiple clients. A PRL (private-record linking) session is required for all datasets involved. There are two types of sessions: train, and predict. To make predictions, the PRL session ID and the corresponding training session ID are required.
For more information, see and .
Generalized Linear Models (GLMs)
This model class supports a variety of regression models. Examples include linear regression, logistic regression, Poisson regression, gamma regression and inverse Gaussian regression models. We also support regularizing the model coefficients with the elastic net penalty.
Examples of use cases include [1]:
- Agriculture / weather modeling: number of rain events per year, amount of rainfall per event, total rainfall per year
- Risk modeling / insurance policy pricing: number of claim events / policyholder per year, cost per event, total cost per policyholder per year
- Predictive maintenance: number of production interruption events per year, duration of interruption, total interruption time per year
The iai_glm model trains generalized linear models by treating them as a special case of single-layer neural nets with particular output activation functions.
Privacy
DP-SGD (differentially private stochastic gradient descent) is applied as an additional privacy-enhancing technology. The basic idea of this approach is to modify the gradients used in stochastic gradient descent (SGD), which lies at the core of almost all deep learning algorithms.
References [1]: https://scikit-learn.org/stable/modules/linear_model.html#generalized-linear-regression
Strategy Library
Reference guide for available training strategies.
FedAvg
Federated Averaging strategy implemented based on https://arxiv.org/abs/1602.05629
Parameters
- fraction_fit (float, optional): Fraction of clients used during training. Defaults to 0.1.
- fraction_eval (float, optional): Fraction of clients used during validation. Defaults to 0.1.
- min_fit_clients (int, optional): Minimum number of clients used during training. Defaults to 1.
- min_eval_clients (int, optional): Minimum number of clients used during validation. Defaults to 1.
- min_available_clients (int, optional): Minimum number of total clients in the system. Defaults to 1.
- eval_fn (Callable[[Weights], Optional[Tuple[float, float]]], optional): Function used for validation. Defaults to None.
- on_fit_config_fn (Callable[[int], Dict[str, Scalar]], optional): Function used to configure training. Defaults to None.
- on_evaluate_config_fn (Callable[[int], Dict[str, Scalar]], optional): Function used to configure validation. Defaults to None.
- accept_failures (bool, optional): Whether or not accept rounds containing failures. Defaults to True. initial_parameters (Weights, optional): Initial global model parameters.
FedAvgM
Federated Averaging with Momentum (FedAvgM) strategy https://arxiv.org/pdf/1909.06335.pdf
Parameters
Uses the same parameters as FedAvg
as well as the following:
- initial_parameters (Weights, optional): Initial global model parameters.
- server_learning_rate (float): Server-side learning rate used in server-side optimization. Defaults to 1.0, which is the same as the vanilla FedAvg
- server_momentum (float): Server-side momentum factor used for FedAvgM. Defaults to 0.0.
- nesterov (bool): Enables Nesterov momentum. Defaults to False.
FedAdam
Adaptive Federated Optimization using Adam (FedAdam) https://arxiv.org/abs/2003.00295
Parameters
Uses the same parameters as FedAvg
as well as the following:
- initial_parameters (Weights, optional): Initial global model parameters.
- eta (float, optional): Server-side learning rate. Defaults to 1e-1.
- beta_1 (float, optional): Momentum parameter. Defaults to 0.9.
- beta_2 (float, optional): Second moment parameter. Defaults to 0.99.
- tau (float, optional): Controls the degree of adaptability for the algorithm. Defaults to 1e-3.
FedAdagrad
Adaptive Federated Optimization using Adagrad (FedAdagrad) strategy https://arxiv.org/abs/2003.00295
Parameters
Uses the same parameters as FedAvg
as well as the following:
- initial_parameters (Weights, optional): Initial global model parameters.
- eta (float, optional): Server-side learning rate. Defaults to 1e-1.
- beta_1 (float, optional): Momentum parameter. Defaults to 0.0. Note that typical AdaGrad does not use momentum, thus usually beta_1 is kept 0.0
- tau (float, optional): Controls the degree of adaptability for the algorithm. Defaults to 1e-3. Smaller tau means higher degree of adaptability of server-side learning rate.
FedOpt
Adaptive Federated Optimization (FedOpt) abstract strategy https://arxiv.org/abs/2003.00295
Parameters
Uses the same parameters as FedAvg
as well as the following:
- initial_parameters (Weights, optional): Initial global model parameters.
- eta (float, optional): Server-side learning rate. Defaults to 1e-1.
- beta_1 (float, optional): Momentum parameter. Defaults to 0.0.
- beta_2 (float, optional): Second moment parameter. Defaults to 0.0.
- tau (float, optional): Controls the degree of adaptability for the algorithm. Defaults to 1e-3. Smaller tau means higher degree of adaptability of server-side learning rate.
FedYogi
Federated learning strategy using Yogi on server-side https://arxiv.org/abs/2003.00295v5
Parameters
- initial_parameters (Weights, optional): Initial global model parameters.
- eta (float, optional): Server-side learning rate. Defaults to 1e-1.
- beta_1 (float, optional): Momentum parameter. Defaults to 0.9.
- beta_2 (float, optional): Second moment parameter. Defaults to 0.99.
- tau (float, optional): Controls the degree of adaptability for the algorithm. Defaults to 1e-3.
Evaluation metrics
Metrics are calculated for each round of training.
When the session is complete, you can see a set of metrics for all rounds of training, as well as metrics for the final model.
Retrieve Metrics for a Session
Use the SessionMetrics
class of the API to store and retrieve metrics for a session. You can retrieve the model performance metrics as a dictionary (Dict), or plot them. See the API Class Reference for details.
Typical usage example:
client = connect("token")
already_trained_session_id = "<sessionID>"
session = client.fl_session(already_trained_session_id)
# retrieve the metrics for the session as a dictionary
metrics = session.metrics.as_dict()
- Authenticate to and connect to the integrate.ai client.
- Provide the session ID that you want to retrieve the metrics for as the
already_trained_session_id
`. - Call the
SessionMetrics
class.
Available Metrics
The Federated Loss value for the latest round of model training is reported as the global_model_federated_loss(float)
attribute for an instance of SessionMetrics
.
This is a model level metric reported for each round of training. It is a weighted average loss across different clients, weighted by the number of examples/samples from each silo.
See the metrics by machine learning task in the following table:
Classification and Logistic | Regression and Normal | Poisson, Gamma, Inverse Gaussian |
---|---|---|
Loss (cross-entropy) | Loss (mean squared score) | Loss (unit deviance) |
ROC_AUC | R2 score | R2 score |
Accuracy |
Building a Custom Model
If the standard integrate.ai models (FFNet, GLM, GBM) do not suit your needs, you can create a custom model. If you are working with non-tabular data, you can also create a custom dataloader to use with your custom model.
integrate.ai supports all custom models under pytorch (for example CNNs, LSTMs, Transformers and GLMs).
Most supervised learning tasks with models that only include DP-compatible modules (including both natively compatible or can be converted to compatible substitute) are supported.
* Some modules (for example, LSTM
) are not compatible with DP-SGD out-of-the-box and must be replaced by their DP equivalent.
* BatchNorm
is not supported due to the nature of DP-SGD and must be replaced by alternative modules.
Customization is restricted in the following ways:
- Models must use one of the following loss functions;
- Classification
- Regression
- Logistic
- Normal
- Poisson
- Gamma
- Inverse gaussian
- Tasks requiring custom loss functions (e.g., image segmentation, object detection) are not supported.
- Only a single output train function is supported.
- Data augmentation is not supported.
- The generic training flow cannot be customized. Therefore complex tasks that require a special training flow are not supported. The current supported flow is:
- Pass input thru the model.
- Calculate loss of the model outputs with the true target variables.
- Perform back propagation to update the model weights.
Download template and examples
Start by cloning the integrateai-samples repo. This repo contains everything you need to learn about and create your own custom model package. Review these examples and the API documentation before getting started.
The repo contains several folders, some of which are relevant to custom models:
- template_package folder that contains template_model.py and template_dataset.py.
- Use these files as starting points when creating a custom model and data loader.
- Two example custom model packages, each including a readmefile that explains how to test and upload the custom model.
- cifar10_vgg16 folder - contains a VGG net model that is ideal for large scale image recognition.
- lstmTagger folder - contains an LSTM (long short term memory) RNN model, which is ideal for processing sequences of data.
Create a custom model package
Using the template files provided, create a custom model package.
Follow the naming convention for files in the custom package: no spaces, no special characters, no hyphens, all lowercase characters.
- Create a folder to contain your custom model package. For this tutorial, this folder is named myCustomModel, and is located in the same parent folder as the template folder.
Example path: C:<workspace>\integrate_ai_sdk\sample_packages\myCustomModel
- Create two files in the custom model package folder:
a.
model.py
- the custom model definition. You can rename the template_model.py as a starting point for this file. b.<model-class-name>.json
- default model inputs for this model. It must have the same name as the model class name that is defined in the model.py file.
If you are using the template files, the default name is templatemodel.json
.
- Optional: To use a custom dataloader, you must also create a
dataset.py
and a dataset configuration JSON file in the same folder. For more information, see #.
If there is no custom dataset file, the default TabularDataset
loader is used. It loads .parquet and .csv files, and requires predictors: ["x1", "x2"], target: y as input for the data configuration. This is what is used for the standard models.
The example below provides the boilerplate for your custom model definition. Fill in the code required to define your model. Refer to the model.py files provided for the lstmTagger and cifar10_vgg16 examples if needed.
from integrate_ai_sdk.base_class import IaiBaseModule
class TemplateModel(IaiBaseModule):
def __init__(self):
"""
Here you should instantiate your model layers based on the configs.
"""
super(TemplateModel, self).__init__()
def forward(self):
"""
The forward path of a model. Can take an input tensor and return a prediction tensor
"""
pass
if __name__ == "__main__":
template_model = TemplateModel()
Custom model configuration inputs
Create a JSON file that defines the model inputs for your model.
It must have the same name as the model class name that is defined in the model.py file (e.g. templatemodel.json
).
The content of this file is dictated by your model.
Tip: File paths can be specified as S3 URLs.
The following parameters are required:
Parameter (type) | Description |
---|---|
experiment_name (string) | The name of your experiment. |
experiment_description (string) | A description of your experiment. |
strategy (object) | The federated learning strategy to use and any required parameters. Supported strategies are: FedAvg, FedAvgM, FedAdam, FedAdagrad, FedOpt, FedYogi. See for details. |
model (object) | The model type and any required parameters. |
ml_task | The machine learning task type and any required parameters. Supported types are: regression, classification, logistic, normal, Poisson, gamma, and inverseGaussian. |
optimizer (object) | The optimizer and any required parameters. See the https://pytorch.org/docs/stable/optim.html package description for details. |
differential_privacy_params (number) | The privacy budget. Larger values correspond to less privacy protection and potentially better model performance. |
Example: FFNet model using a FedAvg strategy
{
"experiment_name": "Test session",
"experiment_description": "This is a test session",
"job_type": "training",
"strategy": {
"name": "FedAvg",
"params": {}
},
"model": {
"type": "FFNet",
"params": {
"input_size": 200,
"hidden_layer_sizes": [80,40,8],
"output_size": 2,
"hidden_activation": "relu"
}
},
"ml_task": "classification",
"optimizer": {
"name": "SGD",
"params": {
"learning_rate": 0.03,
"momentum": 0
}
},
"differential_privacy_params": {
"epsilon": 1,
"max_grad_norm": 7,
"delta": 0.000001
},
"eval_metrics": [
"accuracy",
"loss",
"roc_auc"
]
}
Reference model validation schema
Below is the outline for the full schema used to validate the model configuration inputs for GLM and FFNet models. This schema is provided for reference.
{
"$schema": "https://json-schema.org/draft-07/schema#",
"title": "FL Model Config",
"description": "The model config for an FL model",
"type": "object",
"properties": {
"experiment_name": {
"type": "string",
"description": "Experiment Name"
},
"experiment_description": {
"type": "string",
"description": "Experiment Description"
},
"strategy": {
"type": "object",
"properties": {
"name": {
"enum": [
"FedAvg"
],
"description": "Name of the FL strategy"
},
"params": {
"type": "object",
"properties": {
"fraction_fit": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Proportion of clients to use for training. If fraction * total_num_users is smaller than min_num_clients set in the session config, then min_num_clients will be used."
},
"fraction_eval": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Proportion of clients to use for evaluation. If fraction * total_num_users is smaller than min_num_clients set in the session config, then min_num_clients will be used."
},
"accept_failures": {
"type": "boolean",
"description": "Whether to accept failures during training and evaluation. If False, the training process will be stopped when a client fails, otherwise, the failed client will be ignored."
}
},
"additionalProperties": false
}
},
"required": [
"name",
"params"
]
},
"model": {
"type": "object",
"description": "Model type and parameters",
"properties": {
"params": {
"type": "object",
"description": "Model parameters"
}
},
"required": [
"params"
]
},
"ml_task": {
"type": "object",
"description": "Type of ML task",
"properties": {
"type": {
"enum": [
"regression",
"classification",
"logistic",
"normal",
"poisson",
"gamma",
"inverseGaussian"
]
},
"params": {
"type": "object"
}
},
"required": ["type", "params"],
"allOf": [
{
"if": {
"properties": { "type": { "enum": ["regression", "classification"] } }
},
"then": {
"properties": { "params": { "type": "object" } }
}
},
{
"if": {
"properties": { "type": { "enum": [
"logistic",
"normal",
"poisson",
"gamma",
"inverseGaussian"
] } }
},
"then": {
"properties": { "params": {
"type": "object",
"properties": {
"alpha": {
"type": "number",
"minimum": 0
},
"l1_ratio": {
"type": "number",
"minimum": 0,
"maximum": 1
}
},
"required": ["alpha", "l1_ratio"]
} }
}
}
]
},
"optimizer": {
"type": "object",
"properties": {
"name": {
"enum": [
"SGD"
]
},
"params": {
"type": "object",
"properties": {
"learning_rate": {
"type": "number",
"minimum": 0,
"description": "See https://pytorch.org/docs/stable/generated/torch.optim.SGD.html#torch.optim.SGD for details"
},
"momentum": {
"type": "number",
"minimum": 0,
"description": "See https://pytorch.org/docs/stable/generated/torch.optim.SGD.html#torch.optim.SGD for details"
}
},
"required": [
"learning_rate"
],
"additionalProperties": false
}
},
"required": [
"name",
"params"
],
"additionalProperties": false
},
"differential_privacy_params": {
"type": "object",
"properties": {
"epsilon": {
"type": "number",
"minimum": 0,
"description": "Privacy budget. Larger values correspond to less privacy protection, and potentially better model performance."
},
"max_grad_norm": {
"type": "number",
"minimum": 0,
"description": "The upper bound for clipping gradients. A hyper-parameter to tune."
}
},
"required": [
"epsilon",
"max_grad_norm"
]
},
"eval_metrics": {
"description": "A list of metrics to use for evaluation",
"type": "array",
"minItems": 1,
"items": {}
}
},
"required": [
"experiment_name",
"experiment_description",
"strategy",
"model",
"ml_task",
"optimizer",
"differential_privacy_params"
]
}
Test and upload the custom model
Before you start training your custom model, you should test it and upload it to your workspace. The method for uploading also tests the model by training a single epoch locally. After the model has been successfully uploaded, you or any user with access to the model can train it in a session.
To test and upload a custom model, use the upload_model
method:
def upload_model(
self,
package_path: str,
dataset_path: str,
package_name: str,
sample_model_config_path: str,
sample_data_config_path: str,
batch_size: int,
task: str,
test_only: bool,
description: str
):
where:
Argument (type) | Description |
---|---|
package_path (str) | Path to your custom model folder |
dataset_path (str) | Path to the dataset(s) |
package_name (str) | Name of the custom model package. It must be unique from other previously uploaded package names. |
sample_model_config_path (str) | Path to the model configuration JSON file |
sample_data_config_path (str) | Path to the dataset configuration JSON file |
batch_size (int) | Number of samples to propagate through the network at a time |
task (str) | Either 'classification' or 'regression'. Set it to 'regression' for numeric and 'classification' for categorical target. |
test_only (bool) | If set to True, perform one epoch training to test the model without uploading it. If set to False, tests and uploads the model if the test passes. |
description (str) | Can be set with maximum 1024 characters to describe the model. This description also appears in the integrate.ai web portal. |
This method tests a custom model by creating the model based on the custom model configuration (JSON file) and then training it with one epoch locally. If the model fails the test, it cannot be uploaded.
When starting a session with your custom model, make sure you specify the correct package_name
, model_config
, and data_config
file names. For details, see create_fl_session
in the API documentation.
User Authentication
Sharing access to training sessions and shared models in a simple and secure manner is a key requirement for many data custodians. integrate.ai provides a secure method of authenticating end users with limited permissions through the SDK to enable privileged access.
As the user responsible for managing access through the integrate.ai platform, you have the ability to generate an unscoped API token through the integrate.ai UI. Unscoped API tokens provide full access to the integrate.ai SDK. You can run client training tasks locally, or on remote data.
In the case that you want to create a token that has limited access, to enforce governance standards or provide an end user of your platform with limited access to the integrate.ai SDK, you can create scoped API tokens. Scoped tokens grant limited permissions, which enables you to control the level of access to trained sessions and models.
In the UI, you can view your personal API tokens as well as all scoped API tokens created in your organization's workspace through the SDK. These scoped user tokens are designed for use with the integrate.ai SDK. Tokens are tied to user identities through a unique ID, which is logged with each user action.
Limiting user access by token greatly reduces the security risk of leaked credentials. For example, with an unscoped API token, a user could run tasks on a remote machine, where there is a risk that it could be leaked or exposed because it is shared in an outside (non-local) environment. To mitigate that risk, you can instead provide the user with a scoped token that has limited permissions and a short lifespan (maximum 30 days).
Create an unscoped token
As the user who manages other users' access, you must first create your own unscoped token.
- Log in to your workspace through the portal.
- On the Dashboard, click Generate Access Token.
- Copy the acccess token and save it to a secure location.
Treat your API tokens like passwords and keep them secret. When working with the API, use the token as an environment variable instead of hardcoding it into your programs. In this documentation, the token is referenced as <IAI_TOKEN>
.
Install components
Install the integrate.ai command-line tool (CLI), the SDK, and the client. For detailed instructions, see [##install-integrate-ai-sdk-and-client].
Open sample notebook
Open the Authentication sample notebook (integrateai_auth.ipynb) located in the SDK package.
...integrate_ai_sdk/src/integrate_ai_sdk/sample_packages/sample_notebook/
The notebook provides sample code that demonstrates how to use the SDK to generate users and tokens.
Create a user
from integrate_ai_sdk.auth import connect_to_auth_client
from integrate_ai_sdk.auth.scopes import Scope
import os
IAI_TOKEN = os.environ.get("IAI_TOKEN")
auth_client = connect_to_auth_client(token=IAI_TOKEN)
Create a connection to the auth client with your unscoped token.
user_name = '{user_name}'
user = auth_client.create_user(user_name)
Create a user. Specify a user name (for example, demo-user, or user1@mycompany.com).
The SDK generates a unique ID for the user in the integrate.ai platform.
Example output:
01/27/2023 11:20:24:INFO:Machine user demo-user(f1bd9ff87c@integrate.ai) created by <your-email>.
01/27/2023 11:20:24:INFO:User activated.
Create a scoped token for the user
token = auth_client.create_token(user_id=user_name, scopes=[Scope.create_session, Scope.read_user_session])
print(token)p
Create a scoped token for the user. Include only the scopes that the user requires to work with the system and their data.
This request returns the unique user ID (the generated email), a list of the granted scopes, and the token, as well as the token ID and the user name.
Copy and save the token somewhere secure to share with the user.
Available Scopes
start_session
- Allows a user to start a federated learning training session.create_session
- Allows a user to create a training session.cancel_user_session
- Allows a user to cancel a session they have created.read_user_session
- Allows a user to read a session they have created.read_all_session
- Allows a user to read a session created by any user.custom_model
- Allows a user to upload and use a custom model.delete_user_session
- Allows a user to delete a session that they have created.model_download
- Allows a user to download a trained model.user_model_download
- Allows a user to download a trained model created by themself or integrate.ai.
Verify user and token through the SDK
The token_info command allows you to inspect the details of a token. You must specify the token to inspect.
auth_client.token_info(token['token'])
Example output:
{'customer': 'your-environment-name',
'email': 'generated-email@integrate.ai',
'realm': 'ifl',
'role': 'admin',
'env': 'prod',
'token_id': '55a19b5d077d40a798aa51ace57168c3',
'iss': 'integrate.ai',
'iat': 1674832855,
'renewable': True,
'scope': 'create_session read_model read_session read_user_session',
'user_id': 'demo-user',
'user_type': 'generated', //Indicates whether the user was created through the SDK
'active': True}
Verify user and token through the UI
To confirm that the user and token were created successfully, you can also view them in the web dashboard.
- Log in to the web dashboard.
- Click Token Management.
- Click User Scoped Tokens.
- Locate the user name for the user you created.
Revoke a scoped token
User scoped tokens have a default lifespan of thirty (30) days. To revoke a token before it expires, use the revoke_token command in the SDK.
You must provide the token_id
for the token that you want to revoke. You can find this ID in the web dashboard.
auth_client.revoke_token(token['token_id'])
Delete a user
Users that you create through the SDK can be deleted through the SDK.
Specify the name of the user that you want to delete.
auth_client.delete_user(user_name)
PRIVACY & SECURITY
Differential privacy
What is differential privacy?
Differential privacy is a technique for providing a provable measure of how “private” a data set can be. This is achieved by adding a certain amount of noise when responding to a query on the data. A balance needs to be struck between adding too much noise (making the computation less useful), and too little (reducing the privacy of the underlying data).
The technique introduces the concept of a privacy-loss parameter (typically represented by ε (epsilon)), which can be thought of as the amount of noise to add for each invocation of some computation on data. A related concept is the privacy budget, which can be chosen by the data curator.
This privacy budget represents the measurable amount of information exposed by the computation before the level of privacy is deemed unacceptable.
The benefit of this approach is that by providing a quantifiable measure, there can be a guarantee about how “private” the release of information is. However, in practice, relating the actual privacy to the computation in question can be difficult: i.e. how private is private enough? What will ε need to be? These are open questions in practice for practitioners when applying DP to an application.
How is Differential Privacy used in integrate.ai?
Users can add Differential Privacy to any model built in integrate.ai. DP parameters can be specified during session creation, within the model configuration.
When is the right time to use Differential Privacy?
Overall, differential privacy can be best applied when there is a clear ability to select the correct ε for the computation, and/or it is acceptable to specify a large enough privacy loss budget to satisfy the computation needs.
PRL Privacy for VFL
Private record linkage is a secure method for determining the overlap between datasets
In vertical federated learning (VFL), the datasets shared by any two parties must have some overlap and alignment in order to be used for machine learning tasks. There are typically two main problems:
- The identifiers between the two datasets are not fully overlapped.
- The rows of the filtered, overlapped records for the datasets are not in the same order.
To resolve these differences while maintaining privacy, integrate.ai applies private record linkage (PRL), which consists of two steps: determining the overlap (or intersection) and aligning the rows.
Private Set Intersection
First, Private Set Intersection (PSI) is used to determine the overlap without storing the raw data centrally, or exposing it to any party. PSI is a privacy-preserving technique that is considered a Secure multiparty computation (SMPC) technology. This type of technology uses cryptographic techniques to enable multiple parties to perform operations on disparate datasets without revealing the underlying data to any party.
Additionally, integrate.ai does not store the raw data on a server. Instead, the parties submit the paths to their datasets. integrate.ai uses the ID columns of the datasets to produce a hash locally that is sent to the server for comparison. The secret for this hash is shared only through Diffie–Hellman key exchange between the clients - the server has no knowledge of it.
Private Record Alignment
Once the ID columns are compared, the server knows which dataset indices are common between the two sets and can align the rows. This step is the private record alignment portion of PRL. It enables machine learning tasks to be performed on the datasets.
For more information about running PRL sessions, see PRL Sessions.
Release Notes
28 Aug 2023: Session Credit Usage
This release provides users with the ability to see their credit usage in their workspace UI. Each training or analytic session uses a certain number of credits from the user's allotment. This usage can now be monitored through a graph, with daily details. Users can also request additional credit when needed.
Version: 9.6.2
14 Aug 2023: Azure Task Runners
This release expands the Control Plane system architecture to include Microsoft Azure Task Runners.
Task runners simplify the process of creating an environment to run federated learning tasks. They use the serverless capabilities of cloud environements, which greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.
For more information about task runners and control plane capabilities, see Using integrate.ai.
A tutorial for using Azure task runners is available here.
Version: 9.6.1
14 July 2023: AWS Task Runners
This release introduces the Control Plane system architecture and AWS Task Runners.
Task runners simplify the process of creating an environment to run federated learning tasks. They use the serverless capabilities of cloud environements (such as AWS Batch and Fargate), which greatly reduces the administration burden and ensures that resource cost is only incurred while task runners are in use.
For more information about task runners and control plane capabilities, see Using integrate.ai.
Version: 9.6.0
17 May 2023: 2D Histograms for EDA Intersect & Linear Inference
This release introduces two new features:
- The ability to generate 2D histograms for EDA sessions in Intersect mode. This feature requires the addition of a paired_cols parameter. For more information, see the Intersect Mode tutorial.
- A new model package for linear inference. This package is particularly useful for GWAS training. For more information, see Linear Inference Sessions.
New in this release is also the addition of a single release version number to describe the release package. This release is version: 9.5.0.
27 April 2023: PRL, VFL, and EDA Intersect
This release introduces the following new features:
- The ability to perform private record linkage (PRL) on two datasets. A guide is available here.
- The ability to perform exploratory data analysis (EDA) in Intersect mode, using a PRL session result. A guide is available here.
- The ability to perform Vertical Federated Learning (VFL) in both training and prediction mode. A guide is available here.
Note: this release does not support Python 3.11 due to a known issue in that Python release.
Versions:
- SDK: 0.9.2
- Client: 2.4.2
- Server: 2.6.0
- CLI Tool: 0.0.46
30 Jan 2023: User Authentication
This release added two new features:
- The ability to train Gradient Boosted HFL Models. A guide is available here.
- The ability to generate scoped (non-admin) tokens for users. A guide is available here.
Bug fixes:
- Clients may get disconnected from the server when training large models
Versions:
- SDK: 0.5.36
- Client: 2.0.18
- Server: 2.2.19
- CLI Tool: 0.0.38
08 Dec 2022: Integration with AWS Fargate
This release introduces the ability to run an IAI training server on AWS Fargate through the integrate.ai SDK. With an integrate.ai training server running on Fargate, your data in S3 buckets, and clients running on AWS Batch, you can use the SDK to manage and run fully remote training sessions. A guide is available here.
Versions:
- SDK: 0.5.13
- Client: 2.0.11
- CLI Tool: 0.0.33
02 Nov 2022: Integration with AWS Batch
This release introduces the ability to run AWS Batch jobs through the integrate.ai SDK. Building on the previous release, which added support for data hosted in S3 buckets, you can now start and run a remote training session on remote data with jobs managed by AWS Batch. A guide is available here.
Features:
- Added the ability to run the iai client through AWS Batch
- Added the ability for the iai client to retrieve a token through the IAI_TOKEN environment variable
- Added a version command for the iai client:
iai client version
Note: Docker must be running for the version command to return a value.
- Added support for a new session "pending" status
BREAKING CHANGE:
Session status mapping in the SDK has been updated as follows:
- created -> Created
- started -> Running
- pending -> Pending
- failed -> Failed
- succeeded -> Completed
- canceled -> Canceled
Bug fixes:
- Fixed an issue with small batch sizes
Versions:
- SDK: 0.3.31
- Client: 1.0.15
- CLI Tool: 0.0.31
06 Oct 2022: Exploratory Data Analysis & S3 Support
Features:
- Exploratory Data Analysis (EDA) - integrate.ai now supports the ability to generate histograms for each feature of a dataset. Use the results of the EDA session to calculate summary statistics for both continuous and categorical variables. See more about the feature here.
- This feature has Differential Privacy applied automatically to each histogram to add noise and reduce privacy leakage. The Differential Privacy settings are dynamic and applied to best suit e each dataset individually, to ensure privacy protection without excessive noise.
- S3 data path support - load data from an s3 bucket for the
iai client hfl
andiai client eda
commands. You can use S3 URLs as the data_path given that your AWS CLI environment is properly configured. Read more on how to configure this integration here. - Client logging via
iai client log
command - this new feature in the integrate-ai CLI package allows a user to access session logs from clients, to be used as a tool to help debug failed sessions. Access this using theiai client log
command.
Versions:
- SDK: 0.3.20
- Client: 1.0.8
- CLI Too: 0.0.21
14 Sept 2022: Infrastructure upgrades for session abstraction
- SDK Version: 0.3.5
- Client Version: 1.0.2
- CLI Tool Version: 0.0.21