Skip to content

Commit

Permalink
Initial implementation of replacement on Spot termination
Browse files Browse the repository at this point in the history
  • Loading branch information
cristim committed Dec 3, 2021
1 parent fdbb634 commit 8dc319c
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 32 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ BUILD := $(DOCKER_IMAGE_VERSION)-$(FLAVOR)-$(SHA)
EXPIRATION := $(shell go run ./scripts/expiration_date.go)
SAVINGS_CUT ?= 5

GOARCH ?= amd64
GOARCH ?= arm64

ifneq ($(FLAVOR), custom)
LICENSE_FILES += BINARY_LICENSE
Expand Down Expand Up @@ -67,7 +67,7 @@ artifacts: ## Create CloudFormation ar
.PHONY: artifacts

docker: ## Build a Docker image, currently only supports x86 hosts
docker build --platform=linux/amd64 --push -t $(DOCKER_IMAGE):$(DOCKER_IMAGE_VERSION) .
docker build --platform=linux/arm64 --push -t $(DOCKER_IMAGE):$(DOCKER_IMAGE_VERSION) .
.PHONY: docker

docker-login:
Expand All @@ -77,7 +77,7 @@ docker-push-artifacts: docker artifacts
.PHONY: docker-push-artifacts

docker-marketplace:
docker build -f Dockerfile.marketplace --platform=linux/amd64 --push -t $(DOCKER_IMAGE):$(DOCKER_IMAGE_VERSION) --build-arg savings_cut=${SAVINGS_CUT} .
docker build -f Dockerfile.marketplace --platform=linux/arm64 --push -t $(DOCKER_IMAGE):$(DOCKER_IMAGE_VERSION) --build-arg savings_cut=${SAVINGS_CUT} .
.PHONY: docker-marketplace

docker-marketplace-push-artifacts: docker-marketplace artifacts
Expand Down
28 changes: 16 additions & 12 deletions core/instance_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ func (i *instance) handleInstanceStates() (bool, error) {
return false, nil
}

// returns an instance ID or error
func (i *instance) launchSpotReplacement() (*string, error) {
return i.launchReplacement("spot")
}

func (i *instance) launchReplacement(replacementLifecycle string) (*string, error) {

ltData, err := i.createLaunchTemplateData()
ltData, err := i.createLaunchTemplateData(replacementLifecycle)

if err != nil {
log.Println("failed to create LaunchTemplate data,", err.Error())
Expand All @@ -61,7 +64,7 @@ func (i *instance) launchSpotReplacement() (*string, error) {
return nil, err
}

cfi := i.createFleetInput(lt, instanceTypes)
cfi := i.createFleetInput(lt, instanceTypes, replacementLifecycle)

resp, err := i.region.services.ec2.CreateFleet(cfi)

Expand All @@ -78,7 +81,8 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)

odInstance, err := i.getSwapCandidate()
if err != nil {
log.Printf("Couldn't find suitable OnDemand swap candidate: %s", err.Error())
log.Printf("Couldn't find suitable OnDemand swap candidate, terminating Spot instance: %s", *i.InstanceId)
i.terminate()
return nil, err
}

Expand All @@ -95,7 +99,7 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
defer asg.setAutoScalingMaxSize(maxSize)
}

log.Printf("Attaching spot instance %s to the group %s",
log.Printf("Attaching %s instance %s to the group %s", *i.InstanceLifecycle,
*i.InstanceId, asg.name)
err = asg.attachSpotInstance(*i.InstanceId, true)

Expand All @@ -106,12 +110,12 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
return nil, fmt.Errorf("couldn't attach spot instance %s ", *i.InstanceId)
}

log.Printf("Terminating on-demand instance %s from the group %s",
log.Printf("Terminating instance %s from the group %s",
*odInstance.InstanceId, asg.name)
if err := asg.terminateInstanceInAutoScalingGroup(odInstance.Instance.InstanceId, true, true); err != nil {
log.Printf("On-demand instance %s couldn't be terminated, re-trying...",
log.Printf("instance %s couldn't be terminated, re-trying...",
*odInstance.InstanceId)
return nil, fmt.Errorf("couldn't terminate on-demand instance %s",
return nil, fmt.Errorf("couldn't terminate instance %s",
*odInstance.InstanceId)
}

Expand All @@ -121,23 +125,23 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error)
func (i *instance) getSwapCandidate() (*instance, error) {
odInstanceID := i.getReplacementTargetInstanceID()
if odInstanceID == nil {
log.Println("Couldn't find target on-demand instance of", *i.InstanceId)
log.Println("Couldn't find target instance of", *i.InstanceId)
return nil, fmt.Errorf("couldn't find target instance for %s", *i.InstanceId)
}

if err := i.region.scanInstance(odInstanceID); err != nil {
log.Printf("Couldn't describe the target on-demand instance %s", *odInstanceID)
log.Printf("Couldn't describe the target instance %s", *odInstanceID)
return nil, fmt.Errorf("target instance %s couldn't be described", *odInstanceID)
}

odInstance := i.region.instances.get(*odInstanceID)
if odInstance == nil {
log.Printf("Target on-demand instance %s couldn't be found", *odInstanceID)
log.Printf("Target instance %s couldn't be found", *odInstanceID)
return nil, fmt.Errorf("target instance %s is missing", *odInstanceID)
}

if !odInstance.shouldBeReplacedWithSpot() {
log.Printf("Target on-demand instance %s shouldn't be replaced", *odInstanceID)
log.Printf("Target instance %s shouldn't be replaced", *odInstanceID)
i.terminate()
return nil, fmt.Errorf("target instance %s should not be replaced with spot",
*odInstanceID)
Expand Down
33 changes: 25 additions & 8 deletions core/instance_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,13 @@ func (i *instance) processImageBlockDevices(rii *ec2.RequestLaunchTemplateData)
rii.BlockDeviceMappings = i.convertImageBlockDeviceMappings(resp.Images[0].BlockDeviceMappings)
}

func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, error) {
i.price = i.typeInfo.pricing.onDemand * i.asg.config.OnDemandPriceMultiplier
func (i *instance) createLaunchTemplateData(instanceLifecycle string) (*ec2.RequestLaunchTemplateData, error) {
odPrice := i.typeInfo.pricing.onDemand
mp := 1.0
if i.asg != nil && i.asg.config.OnDemandPriceMultiplier != 0 {
mp = i.asg.config.OnDemandPriceMultiplier
}
i.price = odPrice * mp

placement := ec2.LaunchTemplatePlacementRequest(*i.Placement)

Expand All @@ -430,11 +435,14 @@ func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, e

ltData.EbsOptimized = i.EbsOptimized

ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{
MarketType: aws.String(Spot),
SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{
MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)),
},
if instanceLifecycle == "spot" {

ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{
MarketType: aws.String(Spot),
SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{
MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)),
},
}
}

ltData.Placement = &placement
Expand Down Expand Up @@ -467,7 +475,7 @@ func (i *instance) createFleetLaunchTemplate(ltData *ec2.RequestLaunchTemplateDa
return &ltName, err
}

func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec2.CreateFleetInput {
func (i *instance) createFleetInput(ltName *string, instanceTypes []*string, lifeCycle string) *ec2.CreateFleetInput {

var overrides []*ec2.FleetLaunchTemplateOverridesRequest

Expand All @@ -494,13 +502,22 @@ func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec
SpotOptions: &ec2.SpotOptionsRequest{
AllocationStrategy: aws.String(i.asg.config.SpotAllocationStrategy),
},
OnDemandOptions: &ec2.OnDemandOptionsRequest{
AllocationStrategy: aws.String("prioritized"),
},
Type: aws.String("instant"),
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
SpotTargetCapacity: aws.Int64(1),
TotalTargetCapacity: aws.Int64(1),
DefaultTargetCapacityType: aws.String("spot"),
},
}
if lifeCycle != "spot" {
log.Printf("Overriding default capacity type to ondemand\n")
retval.TargetCapacitySpecification.DefaultTargetCapacityType = aws.String("on-demand")
retval.TargetCapacitySpecification.SpotTargetCapacity = aws.Int64(0)
retval.TargetCapacitySpecification.OnDemandTargetCapacity = aws.Int64(1)
}
return retval
}

Expand Down
10 changes: 8 additions & 2 deletions core/instance_conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func Test_instance_createLaunchTemplateData(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

got, _ := tt.inst.createLaunchTemplateData()
got, _ := tt.inst.createLaunchTemplateData("spot")

// make sure the lists of tags are sorted, otherwise the comparison fails
sort.Slice(got.TagSpecifications[0].Tags, func(i, j int) bool {
Expand Down Expand Up @@ -1586,6 +1586,9 @@ func Test_instance_createFleetInput(t *testing.T) {
},
},
},
OnDemandOptions: &ec2.OnDemandOptionsRequest{
AllocationStrategy: aws.String("prioritized"),
},
SpotOptions: &ec2.SpotOptionsRequest{
AllocationStrategy: aws.String("capacity-optimized-prioritized"),
},
Expand Down Expand Up @@ -1634,6 +1637,9 @@ func Test_instance_createFleetInput(t *testing.T) {
},
},
},
OnDemandOptions: &ec2.OnDemandOptionsRequest{
AllocationStrategy: aws.String("prioritized"),
},
SpotOptions: &ec2.SpotOptionsRequest{
AllocationStrategy: aws.String("capacity-optimized"),
},
Expand All @@ -1650,7 +1656,7 @@ func Test_instance_createFleetInput(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes)
got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes, "spot")

if !reflect.DeepEqual(got, tt.want) {
t.Errorf("instance.createFleetInput() = %v, want %v", got, tt.want)
Expand Down
5 changes: 2 additions & 3 deletions core/instance_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ func (i *instance) canTerminate() bool {
func (i *instance) shouldBeReplacedWithSpot() bool {
protT, _ := i.isProtectedFromTermination()
return i.belongsToEnabledASG() &&
i.asgNeedsReplacement() &&
!i.isSpot() &&
(i.isSpot() || i.asgNeedsReplacement()) &&
!i.isProtectedFromScaleIn() &&
!protT
}
Expand All @@ -121,7 +120,7 @@ func (i *instance) belongsToEnabledASG() bool {
asg.loadLaunchTemplate()
i.asg = &asg
i.price = i.typeInfo.pricing.onDemand / i.region.conf.OnDemandPriceMultiplier * i.asg.config.OnDemandPriceMultiplier
log.Printf("%s instace %s belongs to enabled ASG %s", i.region.name,
log.Printf("%s instance %s belongs to enabled ASG %s", i.region.name,
*i.InstanceId, i.asg.name)
return true
}
Expand Down
93 changes: 89 additions & 4 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,15 @@ func (a *AutoSpotting) processEventInstance(eventType string, region string, ins
spotTermination := newSpotTermination(region)

if spotTermination.IsInAutoSpottingASG(instanceID, a.config.TagFilteringMode, a.config.FilterByTags) {
err := spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType)
newInstance, err := a.replaceTerminatingSpotInstance(*instanceID, region)
if err == nil {
log.Printf("Launched replacement instance for instance %s: %s\n", *instanceID, *newInstance)
return nil
}

log.Printf("Error launching replacement instance for instance %s: %s, continued to handle its Spot termination\n", *instanceID, err.Error())

err = spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType)
if err != nil {
log.Printf("Error executing spot termination/rebalance action: %s\n", err.Error())
return err
Expand Down Expand Up @@ -438,10 +446,13 @@ func (a *AutoSpotting) handleNewInstanceLaunch(regionName string, instanceID str
}

// Try OnDemand
if err := a.handleNewOnDemandInstanceLaunch(r, i); err != nil {
if err := a.handleNewOnDemandInstanceLaunch(r, i); !i.isSpot() && err != nil {
log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId)
return err
}

log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId)

// Try Spot
// in case we're not triggered by SQS event we do nothing, onDemand event already manage launched spot instance
if len(a.config.sqsReceiptHandle) > 0 {
Expand Down Expand Up @@ -511,9 +522,9 @@ func (a *AutoSpotting) handleNewOnDemandInstanceLaunch(r *region, i *instance) e
}

} else {
log.Printf("%s skipping instance %s: either doesn't belong to an "+
log.Printf("%s skipping %s instance %s: either doesn't belong to an "+
"enabled ASG or should not be replaced with spot, ",
i.region.name, *i.InstanceId)
i.region.name, *i.InstanceLifecycle, *i.InstanceId)
debug.Printf("%#v", i)
}
return nil
Expand Down Expand Up @@ -551,3 +562,77 @@ func (a *AutoSpotting) handleNewSpotInstanceLaunch(r *region, i *instance) error
}
return nil
}

func (a *AutoSpotting) replaceTerminatingSpotInstance(instanceID, regionName string) (*string, error) {
r := &region{name: regionName, conf: a.config, services: connections{}}

if !r.enabled() {
return nil, fmt.Errorf("region %s is not enabled", regionName)
}

r.services.connect(regionName, a.config.MainRegion)
r.setupAsgFilters()
r.scanForEnabledAutoScalingGroups()

log.Println("Scanning full instance information in", r.name)
r.determineInstanceTypeInformation(r.conf)

if err := r.scanInstance(aws.String(instanceID)); err != nil {
log.Printf("%s Couldn't scan instance %s: %s", regionName,
instanceID, err.Error())
return nil, err
}

i := r.instances.get(instanceID)
if i == nil {
log.Printf("%s Instance %s is missing, skipping...",
regionName, instanceID)
return nil, errors.New("instance missing")
}
log.Printf("%s Found instance %s in state %s",
i.region.name, *i.InstanceId, *i.State.Name)

if *i.State.Name != "running" {
log.Printf("%s Instance %s is not in the running state",
i.region.name, *i.InstanceId)
return nil, errors.New("instance not in running state")
}

asgName := i.getReplacementTargetASGName()

if asgName == nil {
log.Printf("Missing the ASG name tag\n")
return nil, errors.New("missing ASG name tag")
}

i.asg = i.region.findEnabledASGByName(*asgName)
i.asg.scanInstances()
i.asg.loadDefaultConfig()
i.asg.loadConfigFromTags()
i.asg.loadLaunchConfiguration()
i.asg.loadLaunchTemplate()

newInstanceID, err := i.launchSpotReplacement()
if err != nil {
fmt.Printf("Spot Instance launch failed while replacing %s, error: %s, falling back to on-demand\n", *i.InstanceId, err.Error())

newInstanceID, err = i.launchReplacement("on-demand")
if err != nil {
fmt.Printf("Instance launch failed while replacing %s, error: %s\n", *i.InstanceId, err.Error())
return nil, err
}
}

i.region.scanInstances()
newInstance := i.region.instances.get(*newInstanceID)

newInstance.swapWithGroupMember(i.asg)

if err = i.asg.waitForInstanceStatus(newInstanceID, "InService", 5); err != nil {
log.Printf("Instance %s is still not InService, trying to terminate it.",
*newInstanceID)
newInstance.terminate()
}

return newInstanceID, nil
}

0 comments on commit 8dc319c

Please sign in to comment.