diff --git a/core/instance_actions.go b/core/instance_actions.go index eba52643..a5bc4634 100644 --- a/core/instance_actions.go +++ b/core/instance_actions.go @@ -32,10 +32,13 @@ func (i *instance) handleInstanceStates() (bool, error) { return false, nil } -// returns an instance ID or error func (i *instance) launchSpotReplacement() (*string, error) { + return i.launchReplacement("spot") +} + +func (i *instance) launchReplacement(replacementLifecycle string) (*string, error) { - ltData, err := i.createLaunchTemplateData() + ltData, err := i.createLaunchTemplateData(replacementLifecycle) if err != nil { log.Println("failed to create LaunchTemplate data,", err.Error()) @@ -61,7 +64,7 @@ func (i *instance) launchSpotReplacement() (*string, error) { return nil, err } - cfi := i.createFleetInput(lt, instanceTypes) + cfi := i.createFleetInput(lt, instanceTypes, replacementLifecycle) resp, err := i.region.services.ec2.CreateFleet(cfi) @@ -78,7 +81,8 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error) odInstance, err := i.getSwapCandidate() if err != nil { - log.Printf("Couldn't find suitable OnDemand swap candidate: %s", err.Error()) + log.Printf("Couldn't find suitable OnDemand swap candidate, terminating Spot instance: %s", *i.InstanceId) + i.terminate() return nil, err } @@ -95,7 +99,7 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error) defer asg.setAutoScalingMaxSize(maxSize) } - log.Printf("Attaching spot instance %s to the group %s", + log.Printf("Attaching %s instance %s to the group %s", *i.InstanceLifecycle, *i.InstanceId, asg.name) err = asg.attachSpotInstance(*i.InstanceId, true) @@ -106,12 +110,12 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error) return nil, fmt.Errorf("couldn't attach spot instance %s ", *i.InstanceId) } - log.Printf("Terminating on-demand instance %s from the group %s", + log.Printf("Terminating instance %s from the group %s", *odInstance.InstanceId, asg.name) if err := asg.terminateInstanceInAutoScalingGroup(odInstance.Instance.InstanceId, true, true); err != nil { - log.Printf("On-demand instance %s couldn't be terminated, re-trying...", + log.Printf("instance %s couldn't be terminated, re-trying...", *odInstance.InstanceId) - return nil, fmt.Errorf("couldn't terminate on-demand instance %s", + return nil, fmt.Errorf("couldn't terminate instance %s", *odInstance.InstanceId) } @@ -121,23 +125,23 @@ func (i *instance) swapWithGroupMember(asg *autoScalingGroup) (*instance, error) func (i *instance) getSwapCandidate() (*instance, error) { odInstanceID := i.getReplacementTargetInstanceID() if odInstanceID == nil { - log.Println("Couldn't find target on-demand instance of", *i.InstanceId) + log.Println("Couldn't find target instance of", *i.InstanceId) return nil, fmt.Errorf("couldn't find target instance for %s", *i.InstanceId) } if err := i.region.scanInstance(odInstanceID); err != nil { - log.Printf("Couldn't describe the target on-demand instance %s", *odInstanceID) + log.Printf("Couldn't describe the target instance %s", *odInstanceID) return nil, fmt.Errorf("target instance %s couldn't be described", *odInstanceID) } odInstance := i.region.instances.get(*odInstanceID) if odInstance == nil { - log.Printf("Target on-demand instance %s couldn't be found", *odInstanceID) + log.Printf("Target instance %s couldn't be found", *odInstanceID) return nil, fmt.Errorf("target instance %s is missing", *odInstanceID) } if !odInstance.shouldBeReplacedWithSpot() { - log.Printf("Target on-demand instance %s shouldn't be replaced", *odInstanceID) + log.Printf("Target instance %s shouldn't be replaced", *odInstanceID) i.terminate() return nil, fmt.Errorf("target instance %s should not be replaced with spot", *odInstanceID) diff --git a/core/instance_conversion.go b/core/instance_conversion.go index 190701ba..832cdb95 100644 --- a/core/instance_conversion.go +++ b/core/instance_conversion.go @@ -403,8 +403,13 @@ func (i *instance) processImageBlockDevices(rii *ec2.RequestLaunchTemplateData) rii.BlockDeviceMappings = i.convertImageBlockDeviceMappings(resp.Images[0].BlockDeviceMappings) } -func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, error) { - i.price = i.typeInfo.pricing.onDemand * i.asg.config.OnDemandPriceMultiplier +func (i *instance) createLaunchTemplateData(instanceLifecycle string) (*ec2.RequestLaunchTemplateData, error) { + odPrice := i.typeInfo.pricing.onDemand + mp := 1.0 + if i.asg != nil && i.asg.config.OnDemandPriceMultiplier != 0 { + mp = i.asg.config.OnDemandPriceMultiplier + } + i.price = odPrice * mp placement := ec2.LaunchTemplatePlacementRequest(*i.Placement) @@ -430,11 +435,14 @@ func (i *instance) createLaunchTemplateData() (*ec2.RequestLaunchTemplateData, e ltData.EbsOptimized = i.EbsOptimized - ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{ - MarketType: aws.String(Spot), - SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{ - MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)), - }, + if instanceLifecycle == "spot" { + + ltData.InstanceMarketOptions = &ec2.LaunchTemplateInstanceMarketOptionsRequest{ + MarketType: aws.String(Spot), + SpotOptions: &ec2.LaunchTemplateSpotMarketOptionsRequest{ + MaxPrice: aws.String(strconv.FormatFloat(i.price, 'g', 10, 64)), + }, + } } ltData.Placement = &placement @@ -467,7 +475,7 @@ func (i *instance) createFleetLaunchTemplate(ltData *ec2.RequestLaunchTemplateDa return <Name, err } -func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec2.CreateFleetInput { +func (i *instance) createFleetInput(ltName *string, instanceTypes []*string, lifeCycle string) *ec2.CreateFleetInput { var overrides []*ec2.FleetLaunchTemplateOverridesRequest @@ -494,6 +502,9 @@ func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec SpotOptions: &ec2.SpotOptionsRequest{ AllocationStrategy: aws.String(i.asg.config.SpotAllocationStrategy), }, + OnDemandOptions: &ec2.OnDemandOptionsRequest{ + AllocationStrategy: aws.String("prioritized"), + }, Type: aws.String("instant"), TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{ SpotTargetCapacity: aws.Int64(1), @@ -501,6 +512,12 @@ func (i *instance) createFleetInput(ltName *string, instanceTypes []*string) *ec DefaultTargetCapacityType: aws.String("spot"), }, } + if lifeCycle != "spot" { + log.Printf("Overriding default capacity type to ondemand\n") + retval.TargetCapacitySpecification.DefaultTargetCapacityType = aws.String("on-demand") + retval.TargetCapacitySpecification.SpotTargetCapacity = aws.Int64(0) + retval.TargetCapacitySpecification.OnDemandTargetCapacity = aws.Int64(1) + } return retval } diff --git a/core/instance_conversion_test.go b/core/instance_conversion_test.go index b712e8e2..f176af64 100644 --- a/core/instance_conversion_test.go +++ b/core/instance_conversion_test.go @@ -1053,7 +1053,7 @@ func Test_instance_createLaunchTemplateData(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, _ := tt.inst.createLaunchTemplateData() + got, _ := tt.inst.createLaunchTemplateData("spot") // make sure the lists of tags are sorted, otherwise the comparison fails sort.Slice(got.TagSpecifications[0].Tags, func(i, j int) bool { @@ -1586,6 +1586,9 @@ func Test_instance_createFleetInput(t *testing.T) { }, }, }, + OnDemandOptions: &ec2.OnDemandOptionsRequest{ + AllocationStrategy: aws.String("prioritized"), + }, SpotOptions: &ec2.SpotOptionsRequest{ AllocationStrategy: aws.String("capacity-optimized-prioritized"), }, @@ -1634,6 +1637,9 @@ func Test_instance_createFleetInput(t *testing.T) { }, }, }, + OnDemandOptions: &ec2.OnDemandOptionsRequest{ + AllocationStrategy: aws.String("prioritized"), + }, SpotOptions: &ec2.SpotOptionsRequest{ AllocationStrategy: aws.String("capacity-optimized"), }, @@ -1650,7 +1656,7 @@ func Test_instance_createFleetInput(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes) + got := tt.i.createFleetInput(tt.ltName, tt.instanceTypes, "spot") if !reflect.DeepEqual(got, tt.want) { t.Errorf("instance.createFleetInput() = %v, want %v", got, tt.want) diff --git a/core/instance_queries.go b/core/instance_queries.go index 73db257c..033921c1 100644 --- a/core/instance_queries.go +++ b/core/instance_queries.go @@ -97,8 +97,7 @@ func (i *instance) canTerminate() bool { func (i *instance) shouldBeReplacedWithSpot() bool { protT, _ := i.isProtectedFromTermination() return i.belongsToEnabledASG() && - i.asgNeedsReplacement() && - !i.isSpot() && + (i.isSpot() || i.asgNeedsReplacement()) && !i.isProtectedFromScaleIn() && !protT } @@ -121,7 +120,7 @@ func (i *instance) belongsToEnabledASG() bool { asg.loadLaunchTemplate() i.asg = &asg i.price = i.typeInfo.pricing.onDemand / i.region.conf.OnDemandPriceMultiplier * i.asg.config.OnDemandPriceMultiplier - log.Printf("%s instace %s belongs to enabled ASG %s", i.region.name, + log.Printf("%s instance %s belongs to enabled ASG %s", i.region.name, *i.InstanceId, i.asg.name) return true } diff --git a/core/main.go b/core/main.go index 9fd33970..d51d7bac 100644 --- a/core/main.go +++ b/core/main.go @@ -260,7 +260,15 @@ func (a *AutoSpotting) processEventInstance(eventType string, region string, ins spotTermination := newSpotTermination(region) if spotTermination.IsInAutoSpottingASG(instanceID, a.config.TagFilteringMode, a.config.FilterByTags) { - err := spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType) + newInstance, err := a.replaceTerminatingSpotInstance(*instanceID, region) + if err == nil { + log.Printf("Launched replacement instance for instance %s: %s\n", *instanceID, *newInstance) + return nil + } + + log.Printf("Error launching replacement instance for instance %s: %s, continued to handle its Spot termination\n", *instanceID, err.Error()) + + err = spotTermination.executeAction(instanceID, a.config.TerminationNotificationAction, eventType) if err != nil { log.Printf("Error executing spot termination/rebalance action: %s\n", err.Error()) return err @@ -438,10 +446,13 @@ func (a *AutoSpotting) handleNewInstanceLaunch(regionName string, instanceID str } // Try OnDemand - if err := a.handleNewOnDemandInstanceLaunch(r, i); err != nil { + if err := a.handleNewOnDemandInstanceLaunch(r, i); !i.isSpot() && err != nil { + log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId) return err } + log.Printf("%s Instance %s couldn't be handled as on-demand instance", i.region.name, *i.InstanceId) + // Try Spot // in case we're not triggered by SQS event we do nothing, onDemand event already manage launched spot instance if len(a.config.sqsReceiptHandle) > 0 { @@ -511,9 +522,9 @@ func (a *AutoSpotting) handleNewOnDemandInstanceLaunch(r *region, i *instance) e } } else { - log.Printf("%s skipping instance %s: either doesn't belong to an "+ + log.Printf("%s skipping %s instance %s: either doesn't belong to an "+ "enabled ASG or should not be replaced with spot, ", - i.region.name, *i.InstanceId) + i.region.name, *i.InstanceLifecycle, *i.InstanceId) debug.Printf("%#v", i) } return nil @@ -551,3 +562,77 @@ func (a *AutoSpotting) handleNewSpotInstanceLaunch(r *region, i *instance) error } return nil } + +func (a *AutoSpotting) replaceTerminatingSpotInstance(instanceID, regionName string) (*string, error) { + r := ®ion{name: regionName, conf: a.config, services: connections{}} + + if !r.enabled() { + return nil, fmt.Errorf("region %s is not enabled", regionName) + } + + r.services.connect(regionName, a.config.MainRegion) + r.setupAsgFilters() + r.scanForEnabledAutoScalingGroups() + + log.Println("Scanning full instance information in", r.name) + r.determineInstanceTypeInformation(r.conf) + + if err := r.scanInstance(aws.String(instanceID)); err != nil { + log.Printf("%s Couldn't scan instance %s: %s", regionName, + instanceID, err.Error()) + return nil, err + } + + i := r.instances.get(instanceID) + if i == nil { + log.Printf("%s Instance %s is missing, skipping...", + regionName, instanceID) + return nil, errors.New("instance missing") + } + log.Printf("%s Found instance %s in state %s", + i.region.name, *i.InstanceId, *i.State.Name) + + if *i.State.Name != "running" { + log.Printf("%s Instance %s is not in the running state", + i.region.name, *i.InstanceId) + return nil, errors.New("instance not in running state") + } + + asgName := i.getReplacementTargetASGName() + + if asgName == nil { + log.Printf("Missing the ASG name tag\n") + return nil, errors.New("missing ASG name tag") + } + + i.asg = i.region.findEnabledASGByName(*asgName) + i.asg.scanInstances() + i.asg.loadDefaultConfig() + i.asg.loadConfigFromTags() + i.asg.loadLaunchConfiguration() + i.asg.loadLaunchTemplate() + + newInstanceID, err := i.launchSpotReplacement() + if err != nil { + fmt.Printf("Spot Instance launch failed while replacing %s, error: %s, falling back to on-demand\n", *i.InstanceId, err.Error()) + + newInstanceID, err = i.launchReplacement("on-demand") + if err != nil { + fmt.Printf("Instance launch failed while replacing %s, error: %s\n", *i.InstanceId, err.Error()) + return nil, err + } + } + + i.region.scanInstances() + newInstance := i.region.instances.get(*newInstanceID) + + newInstance.swapWithGroupMember(i.asg) + + if err = i.asg.waitForInstanceStatus(newInstanceID, "InService", 5); err != nil { + log.Printf("Instance %s is still not InService, trying to terminate it.", + *newInstanceID) + newInstance.terminate() + } + + return newInstanceID, nil +}