Home Machine Learning A Precedence Primarily based Scheduler for Amazon SageMaker Coaching Jobs | by Chaim Rand | Mar, 2024

A Precedence Primarily based Scheduler for Amazon SageMaker Coaching Jobs | by Chaim Rand | Mar, 2024

0
A Precedence Primarily based Scheduler for Amazon SageMaker Coaching Jobs | by Chaim Rand | Mar, 2024

[ad_1]

Optimizing using restricted AI coaching accelerators — Half 2

Photograph by Adrien Aletti on Unsplash

This submit was created in collaboration with Max Rabin.

That is the second a part of a collection of posts on the subject of maximizing the utility of scarce AI assets. Within the first submit we famous the growing limitations on the power to scale up AI assets at will and, as a consequence, the rising pattern of AI growth groups to ensure AI compute capability by means corresponding to increase an in-house AI server farm and/or reserving devoted cases within the cloud. The shortage of AI compute assets motivates the design of specialised scheduling options to reduce idle time and prioritize vital workloads. Please see our earlier submit during which we proposed an in depth record of necessities for such options. The method we took there was to leverage the prevailing priority-based scheduler that comes with Kubernetes and align our coaching growth workflow to its use. On this submit we discover the choice of sustaining our present framework for coaching AI fashions and enhancing it with our personal customized implementation of a priority-based scheduler. Importantly, the necessity for one of these answer is commonly motivated not simply by the shortage of AI assets, but additionally by the need to extend management over the orchestration and prioritization of coaching workloads in order to scale back growth prices. For instance, even in a situation of plentiful capability, you could select to restrict your use to a hard and fast variety of coaching cases in order to cap your coaching expenditure.

For the needs of this submit, we are going to assume that our coaching framework of selection is AWS’s managed service for AI mannequin coaching, Amazon SageMaker. The answer we are going to suggest will use extra AWS providers corresponding to Amazon DynamoDB and AWS Lambda. The selection to reveal our answer utilizing AWS providers shouldn’t be considered as endorsement. There are lots of cloud-based service choices accessible and the very best one for you’ll depend upon the actual particulars of your challenge. Related options to the one that we are going to describe might be designed on different cloud-based environments and/or utilizing various cloud-based providers.

Historically, we might begin up a SageMaker coaching job utilizing the Amazon SageMaker Python SDK. Within the code block under we use the SageMaker SDK (model 2.208) to run a PyTorch coaching workload on a single occasion of kind p5.48xlarge.

from sagemaker.pytorch import PyTorch

# outline job
estimator = PyTorch(
function='<sagemaker function>',
entry_point='prepare.py',
instance_type='ml.p5.48xlarge',
instance_count=1,
framework_version='2.0.1',
py_version='py310',
tags=[{'Key': 'priority', 'Value': '100'}
)

# start job
estimator.fit()

When the estimator.fit() function is called, the SageMaker library uploads our code to Amazon S3 and then transforms the request to a boto3 SageMaker client create_training_job request (see here).

This method for starting up training jobs is dependent on the availability of the requested resources for its success. In our scenario of scarce AI resources, it is likely to fail more often than not. Although this can be partially mitigated by retaining provisioned compute instances for successive workloads, the API does not provide the appropriate tooling for maximizing their utility. Let’s suppose that we wish to utilize precisely two p5.48xlarge instances. To simplify our discussion, let’s assume that each training workload runs on a single instance. Typically, during an AI model development cycle there will be periods when there are more than two training workloads that are waiting to be processed. The existing API would try to start up a third p5.48xlarge instance and would most likely fail due to its limited availability. Even when there is instance availability, we may wish to limit our training to just our two designated instances to increase our control over the costs of training.

We require a new API for submitting jobs for training, one that does not immediately start up a new p5.48xlarge instance, but rather enters the jobs to a priority queue. And we need an associated job scheduler that manages the use of our two resources while prioritizing critical workloads.

Importantly, please note that as of the time of this writing, Amazon SageMaker does not support the option of training on reserved Amazon EC2 instances. And although Amazon SageMaker Savings Plans has similar properties to instance reservations, it does not guarantee instance capacity. In a previous post we addressed this limitation and proposed using SageMaker managed warm pools as an alternative method for retaining access to provisioned instances. For the remainder of the post, we will assume that we are able to attain two instances of our choice whether it be through this or some other method.

In this section we will describe the components of our proposed solution. We will use the AWS Serverless Application Model (SAM) specification. More specifically, we will create an AWS SAM template YAML file and gradually add the AWS resources that we need. Please see the documentation for details on how to define and deploy serverless solutions using AWS SAM.

AWS Architecture Diagram (by Author)

We start by using Amazon API Gateway to define a private REST API for submitting training job requests. We name the API training-job-queue. Later, we will add a POST method called add-job and modify our training-job creation code to use this method instead of the SageMaker client create_training_job API. The code block below contains the definition of the private API resource in SAM. In practice you will likely want to specify access limitations to the API and/or a method of authorization.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
InternalAPI:
Type: AWS::Serverless::Api
# Auth: # Add access control to API
EndpointConfiguration:
Type: PRIVATE
# VPCEndpointIds: # Specify VPC Endpoint(s)
Name: training-job-queue
StageName: prod

Define an AWS DynamoDB Table for Storing Training Job Requests

We will use an Amazon DynamoDB table named sagemaker-queue to store the submitted training workloads. Each entry will have the following fields:

  1. jobName: Stores the unique name of the training job.
  2. entryTime: Stores the date and time that the job was added.
  3. jobState: Stores the current state of the training job. The valid values are ‘pending’, ‘running’, and ‘preempted’.
  4. priority: Stores an integer value representing the relative priority of the job.
  5. jobDetails: Stores the details of the job request.

We define our DynamoDB table in our SAM template YAML file using the AWS::Serverless::SimpleTable resource.

  DynamoSMQueue:
Type: AWS::Serverless::SimpleTable
Properties:
PrimaryKey:
Name: jobName
Type: String
TableName: sagemaker-queue

We define a function that creates a table entry from a given training job request. We assume that request contains the same contents as the input to the create_training_job API in JSON format. We further assume that the priority of the workload is entered as a key-value tag in the training job definition.

import json, boto3, datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sagemaker-queue')

def add_job_entry(job_json):
job_details = json.loads(job_json)

# extract job_name
job_name = job_details['TrainingJobName']
print(f'add entry {job_name}')

# get present time
entry_time = datetime.now().strftime("%Y-%m-%dTpercentH:%M:%S")

# default precedence is 0
precedence = 0

# replace precedence based mostly on tags
tags = job_details['Tags']
for tag in tags:
if tag['Key'] == 'precedence':
precedence = int(tag['Value'])
break

# create entry
entry = {
'jobName': job_name,
'entryTime': entry_time,
'jobState': 'pending',
'precedence': precedence,
'jobDetails': job_json
}
desk.put_item(Merchandise=entry) #TODO deal with errors
print(f'Added job {job_name} to queue')

The REST API add-job technique that we are going to quickly outline might be programmed to name the add_job_entry operate.

We outline a second operate that extracts the pending jobs from the database and returns them so as of precedence. Within the case that a number of jobs have the identical precedence, they’re ordered in line with the period of time they’ve been ready within the queue.

from boto3.dynamodb.situations import Attr

# Get a listing of all pending jobs sorted by precedence
def get_pending_jobs():
response = desk.scan(
ProjectionExpression='jobName, precedence, entryTime',
FilterExpression=Attr('jobState').ne('operating')
)
jobs = response.get('Gadgets', [])

# type jobs, first by precedence (descending) after which by entryTime
sorted_jobs = sorted(jobs,
key=lambda x: (-x['priority'], x['entryTime']))

return sorted_jobs

The next utility features will come in useful within the subsequent sections.

# Get a jobName -> precedence mapping of all operating jobs
def get_running_jobs_dict():
# Get all operating jobs
response = desk.scan(
ProjectionExpression="jobName, precedence",
FilterExpression=Attr('jobState').eq('operating')
)
jobs = response.get('Gadgets', [])

running_jobs = {job['jobName']: job['priority'] for job in jobs}

return running_jobs

# Print the queue state
def print_queue_state():
response = desk.scan(
ProjectionExpression='jobName, jobState, precedence'
)
jobs = response.get('Gadgets', [])

print_table = []
for job in jobs:
print_table.append([job['jobName'], job['jobState'], job['priority']])

# type by precedence
sorted_table = sorted(print_table,
key=lambda x: -x[2])
# Print the desk
from tabulate import tabulate
print(tabulate(sorted_table, headers=['Job Name', 'State', 'Priority']))

# get job particulars
def get_job_details(job_name):
response = desk.get_item(
Key={'jobName': job_name},
ProjectionExpression='jobDetails'
)
return json.hundreds(response.get('Merchandise').get('jobDetails'))

# get job state or None if the job doesn't exist
def get_job_state(job_name):
response = desk.get_item(
Key={'jobName': job_name},
ProjectionExpression='jobState'
)
job = response.get('Merchandise')
return job.get('jobState') if job else None

# replace the job state
def update_job_state(job_name, new_state):
desk.update_item(
Key={'jobName': job_name},
UpdateExpression="SET jobState = :new_state",
ExpressionAttributeValues={":new_state": new_state}
)
print(f'Replace job {job_name} to {new_state}')

# take away a job entry
def remove_job(job_name):
desk.delete_item(
Key={'jobName': job_name}
)
print(f'Eliminated job {job_name} from queue')

Each our selection of DynamoDB and its utilization (e.g., our use of the Scan API slightly than the Question API) assume that the general variety of jobs in our queue might be within the dozens, at most. For a bigger scale answer, you could be higher off with a heavier obligation database (e.g., one which performs the sorting operation for you) or a extra refined use of DynamoDB (e.g., see right here).

Outline the Coaching Job Queue Supervisor

The principle part of our answer is the coaching job scheduler. Right here we implement a slightly easy supervisor that performs the next steps:

  1. Extract the record of queued jobs, ordered by precedence. If none exist, return.
  2. Uncover unused occasion capability. For every free occasion, begin one pending job on SageMaker. If no jobs stay after that, return.
  3. Calculate the variety of SageMaker jobs within the Stopping state. If larger than the variety of pending jobs, return.
  4. Assess the necessity for preemption of operating SageMaker jobs by evaluating their priorities to these of our pending jobs.
# set the restrict on whole variety of cases/jobs
MAX_CAPACITY = 2

sagemaker = boto3.consumer('sagemaker')

# apply a queue stamp to establish that the job got here from the queue
def apply_qstamp(job_name):
return f'{job_name}-qstamp-{datetime.now().strftime("%dpercentHpercentM")}'

# strip the queue stamp
def strip_qstamp(job_name):
return job_name.cut up('-qstamp-')[0]

# begin a SageMaker job and replace job entry in queue
def start_job(job_name):
print(f'begin job {job_name}')
job_details = get_job_details(job_name)
job_details['TrainingJobName'] = apply_qstamp(job_name)
if(job_details):
# begin job with element from queue
# (you could optinally overwrite fields such because the iam function)
response = sagemaker.create_training_job(**job_details)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f'began job {job_name}')
update_job_state(job_name, 'operating')

# preempt a SageMaker job and replace job entry in queue
def preempt_job(job_name):
print(f'preempt job {job_name}')
response = sagemaker.stop_training_job(TrainingJobName=job_name)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f'preempted job {job_name}')
update_job_state(strip_qstamp(job_name), 'preempted')

# get SageMaker jobs
def get_sagemaker_jobs(standing):
operating = sagemaker.list_training_jobs(StatusEquals=standing)
return operating.get('TrainingJobSummaries', [])

# queue supervisor
def manage_queue():
# extract pending jobs to run
pending = get_pending_jobs()

if not pending:
return

if len(pending) > MAX_CAPACITY:
pending = pending[:MAX_CAPACITY]

# get operating sagemaker jobs
operating = get_sagemaker_jobs('InProgress')
total_running = len(operating)

# get stopping sagemaker jobs
stopping = get_sagemaker_jobs('Stopping')
total_stopping = len(stopping)

# calculate the variety of free cases
free_slots = MAX_CAPACITY - total_running - total_stopping

jobs_to_start = min(len(pending), free_slots)

# for every free occasion, begin a job
for i in vary(jobs_to_start):
start_job(pending[i].get('jobName'))

still_pending = pending[jobs_to_start:]

if not still_pending:
return

# assume that 'total_stopping' variety of jobs will begin quickly
test_for_preemption = len(still_pending) - total_stopping
if test_for_preemption <= 0:
return

# examine if preemption is required
test_priority = still_pending[total_stopping:]

running_jobs = get_running_jobs_dict()
priority_dict = {}
for job in operating:
job_name = job['TrainingJobName']
priority_dict[job_name] = running_jobs[strip_qstamp(job_name)]

# type operating jobs from lowest to highest precedence
sorted_running = sorted(priority_dict.gadgets(), key=lambda merchandise: merchandise[1])

index = 0
whereas index < test_for_preemption and
test_priority[index].get('precedence') > sorted_running[index][1]:
preempt_job(sorted_running[index][0])
index = index + 1

Essential notes:

  1. Our implementation is very optimistic within the sense that we assume that each one the roles which might be inserted are legitimate and that we will begin them up on SageMaker with out subject. In observe, acceptable error dealing with needs to be added (e.g., eradicating defective jobs from the queue with acceptable logging).
  2. In a manufacturing atmosphere, we would want to think about the seemingly prevalence of a race situation when our queue_manager is triggered by a number of concurrent occasions. There are a number of methods of addressing this downside (e.g., see right here) together with implementing atomicity (e.g., by setting our Lambda operate concurrency to 1), utilizing some type of locking mechanism (e.g., as executed right here), or making our operate idempotent. Right here now we have taken the method of what we name “optimistic idempotence”, the place we depend on acceptable use of the API and on the idempotency of our underlying calls to the SageMaker APIs.
  3. We emphasize that our implementation is naïve. In observe, we suggest a extra refined algorithm that 1) accounts for using several types of cases and jobs that require a couple of occasion, 2) takes all edge circumstances into consideration, and three) is tailor-made in the direction of the particular wants of your challenge.

Outline the AWS Lambda Perform

The following part of the answer is the Lambda operate. The next code block contains the SAM definition of our serverless operate. We program the operate to run on two several types of occasions: any name to add-job on our personal API gateway and a change to the state of a SageMaker coaching job.

  ManagedTrainingJobQueue:
Sort: AWS::Serverless::Perform
Properties:
CodeUri: job-queue/ # the listing containing our index.py file
Handler: index.lambda_handler
Runtime: python3.12
Architectures:
- arm64 # use graviton
Insurance policies: # permit entry to SageMaker and DynamoDB
- !Sub "arn:${AWS::Partition}:iam::aws:coverage/AmazonSageMakerFullAccess"
- DynamoDBCrudPolicy:
TableName: !Ref DynamoSMQueue
Occasions:
CreateTraining:
Sort: Api
Properties:
Path: /add-job
Technique: submit
RestApiId: !Ref InternalAPI
SageMakerEvent:
Sort: EventBridgeRule
Properties:
Sample:
supply:
- aws.sagemaker
detail-type:
- SageMaker Coaching Job State Change
element:
TrainingJobStatus:
- "Accomplished"
- "Failed"
- "Stopped"

The lambda_handler operate is applied as follows:

def lambda_handler(occasion, context):
# establish supply of occasion and take acceptable motion
if 'requestContext' in occasion and 'apiId' in occasion['requestContext']:
print('Lambda triggerred by API Gateway')
job_details = json.hundreds(occasion.get('physique'))
add_job_entry(job_details)
elif 'supply' in occasion and occasion['source'] == 'aws.sagemaker':
print('Lambda triggerred by SageMaker job state change')
job_name = occasion['detail']['TrainingJobName']
job_status = occasion['detail']['TrainingJobStatus']
print(f'{job_name} standing modified to {job_status}')

# strip qstamp from job_name
job_name = strip_qstamp(job_name)

if job_status in ['Completed' , 'Failed']:
remove_job(job_name)
elif job_status == 'Stopped':
# examine if it was manually stopped or preempted by queue supervisor
if get_job_state(job_name) == 'preempted':
print(f'job {job_name} preemption accomplished')
else:
print(f'job {job_name} {job_status}, take away from queue')
remove_job(job_name)

# in all circumstances invoke queue supervisor
manage_queue()

Intercept the Create Coaching Job Request

The ultimate modification required to make our answer full is to intercept the decision to the SageMaker create_training_job API and reroute it to our add-job technique. We do that by overriding the _intercept_create_request operate of the SageMaker Session class:

from sagemaker.pytorch import PyTorch
from sagemaker.session import Session
import requests, logging
logger = logging.getLogger('sagemaker')

def submit_to_training_queue(job):
logger.information(f'Including training-job {job['TrainingJobName']} to queue')
logger.debug('prepare request: {json.dumps(job, indent=4)}')

vpce='<vpc endpoint>' # insert id of vpc endpoint
area='us-east-1' # specify area
url=f'https://{vpce}.execute-api.{area}.vpce.amazonaws.com/prod/add-job'
headers = {'x-apigw-api-id': '<api-id>'} # insert api gateway id

# submit job
response = requests.submit(url, headers=headers, json=job)

class QueueTrainingJobSession(Session):
def _intercept_create_request(self, request, create, func_name = None):
"""This operate intercepts the create job request

Args:
request (dict): the create job request
create (functor): a functor calls the sagemaker consumer create technique
func_name (str): the identify of the operate wanted intercepting
"""
if func_name == 'prepare':
submit_to_training_queue(request)
else:
tremendous()._intercept_create_request(request,create,func_name)

# outline job
estimator = PyTorch(
function='<sagemaker function>',
entry_point='prepare.py',
instance_type='ml.p5.48xlarge',
instance_count=1,
framework_version='2.0.1',
py_version='py310',
tags=[{'Key': 'priority', 'Value': '100'},
keep_alive_period_in_seconds=60, # keep warm for 1 minute
# use our custom Session class
sagemaker_session=QueueTrainingJobSession()
)

estimator.fit(wait=False)

To test our solution we submit the following sequence of jobs. After each call we print the status of the queue (using the print_queue_state function) and sleep for twenty seconds.

  1. Start job1 with priority 1.
  2. Start job2 with priority 2.
  3. Start job3 with priority 1.
  4. Start job4 with priority 3.

The first two jobs are immediately submitted to SageMaker and updated to the running state. Since the third job has low priority and we have precisely two training instances, it remains in the pending state and waits its turn. After submitting the first three jobs, the queue state appears as:

Job Name    State      Priority
---------- ------- ----------
job2 running 2
job1 running 1
job3 pending 1

The fourth job we submit has a higher priority than all of the jobs in the queue. Consequently, the running job with the lowest priority, job1, is preempted. The corresponding SageMaker job is stopped and once the instance is released, the queue state becomes:

Job Name    State        Priority
---------- --------- ----------
job4 running 3
job2 running 2
job1 preempted 1
job3 pending 1

The SageMaker job running job2 is the first to finish, job2 is removed from the queue, and our preempted job is resumed:

Job Name    State      Priority
---------- ------- ----------
job4 running 3
job1 running 1
job3 pending 1

Once job4 is completed, it too is removed from the queue, making room for job3. The remaining jobs are also run to completion, ultimately leaving our queue empty.

The increasing difficulty of acquiring AI compute capacity has forced AI development teams to reevaluate the processes they use for training AI models. The approach we have demonstrated in this post is to augment the traditional APIs for training models with a custom-made priority queue and an associated job scheduler. Importantly, the proposal we have put forth should be viewed as a general scheme, not as a production-worthy solution. Appropriate modifications and enhancements would be required to address the specifics needs of your project.

[ad_2]