EMR — Spark in AWS — Fleet Provisioning

Siddharth Malani
3 min readJun 21, 2019

In AWS EMR we ran into issues with provisioning of large instances. Usually this would occur during the peak times and weekends, especially, bank holiday weekends.

We looked at various options and Fleet Provisioning seemed very attractive. But as everything else in AWS the documentation is cryptic and hard to understand so I thought it might be worth sharing this after decoding with a few examples.

The way the fleet works is quite simple.

  1. In EMR you have three types of nodes MASTER, CORE and TASK.
  2. For each of these you can have one fleet each. Lets call it Master Fleet, Core Fleet and Task Fleet
  3. The Master fleet target can only be 1 as only 1 master is provisioned. You can still have 5 different instance types however each one will have a weight of 1.
  4. In each of these fleets you can have upto 5 instance type options. i.e. m4.4xlarge, m4.2xlarge, m5.large etc…
  5. For each fleet you can assign a target weight and for each instance type in that fleet you can give a weight. For example, for Core or Task Fleet you may give a total target weight of 10 and for m4.4xlarge => 4, m4.2xlarge =>2, m4.xlarge => 1
  6. Now when the provisioning happens for the EMR cluster AWS will try and add the instances to get to the target by provisioning whichever instance types are available from the ones in the list.
  7. It may exceed the target if a smaller instance is not available to meet the target. It would provision the next available size to meet the target.

Example:

I need a Core Cluster with total compute of 40 CPUs(excluding the other types from this example for simplicity.)

Fleet :

m4.4xlarge (16 cpus defined weight 4)

m4.2xlarge (8 cpus defined weight 2)

m4.xlarge (4 cpus defined weight 1)

Target: 10 ( 40 cpus)

Scenario 1: AWS provisioning (exact):

m4.4xlarge => 4 x 1 instance = 4 points

m4.2xlarge => 2 x 2 instance = 4points

m4.xlarge => 2 x 2 instance = 2 points

Provisioning COMPLETE!

Scenario 2: AWS provisioning (exceeds target):

m4.4xlarge => 4 x 3 instances = 12 points

Provisioning COMPLETE!

Here is how I did it:

Config YAML

emr-master-fleet-config:
targetWeight:
1
fleetInstanceList:
-
instanceType: m4.4xlarge
weight: 1
ebs: 200
-
instanceType: m4.2xlarge
weight: 1
ebs: 200
-
instanceType: m4.xlarge
weight: 1
ebs: 200
emr-core-fleet-config:
targetWeight:
16
fleetInstanceList:
-
instanceType: m4.4xlarge
weight: 4
ebs: 200
-
instanceType: m4.2xlarge
weight: 2
ebs: 100
-
instanceType: m4.xlarge
weight: 1
ebs: 50

Spring magic to ingest yaml neatly

import lombok.Data;

@Data
public class FleetInstance {

private String instanceType;
private int weight;
private int ebs;

}
@ConfigurationProperties(prefix = "emr-master-fleet-config")
@Configuration
@Data
public class MasterFleetConfig {

private int targetWeight;

private List<FleetInstance> fleetInstanceList;

}
@ConfigurationProperties(prefix = "emr-core-fleet-config")
@Configuration
@Data
public class CoreFleetConfig {

private int targetWeight;

private List<FleetInstance> fleetInstanceList;

}

Code to trigger EMR

@Autowired
private MasterFleetConfig masterFleetConfig;
@Autowired
private CoreFleetConfig coreFleetConfig;
public String startNewCluster() {
RunJobFlowRequest request = new RunJobFlowRequest()
.withName(extractionType.equals("EMR Cluster")
.withApplications(new Application().withName("Spark"))
.withReleaseLabel("emr-5.13.0")
.withServiceRole(emrServiceRole)
.withJobFlowRole(emrInstanceRole)
.withVisibleToAllUsers(true)
.withLogUri(logUri)
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName(ec2KeyName)
.withInstanceFleets(Arrays.asList(
new InstanceFleetConfig()
.withName("MasterFleet")
.withInstanceFleetType(
InstanceFleetType.MASTER)
.withTargetOnDemandCapacity(
masterFleetConfig
.getTargetWeight())
.withInstanceTypeConfigs(
Arrays.asList(
instanceTypes(
masterFleetConfig))),
new InstanceFleetConfig()
.withName("CoreFleet")
.withInstanceFleetType(
InstanceFleetType.CORE)
.withTargetOnDemandCapacity(
coreFleetConfig
.getTargetWeight())
.withInstanceTypeConfigs(
Arrays.asList(
instanceTypes(
coreFleetConfig)))
)
.withEc2SubnetIds(ec2SubnetId)
);

RunJobFlowResult runJobFlowResult = emr.runJobFlow(request);
return runJobFlowResult.getJobFlowId();
}

private InstanceTypeConfig[] instanceTypes(
FleetConfig.FleetTypeConfig config) {

return config.getFleetInstanceList().stream().map(
k -> new InstanceTypeConfig()
.withInstanceType(k.getInstanceType())
.withWeightedCapacity(k.getWeight())
.withEbsConfiguration(ebsConfig(k.getEbs()))

).collect(Collectors.toList()).toArray(
new InstanceTypeConfig[0]);
}

private EbsConfiguration ebsConfig(int size) {
return new EbsConfiguration()
.withEbsBlockDeviceConfigs(
singletonList(new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification()
.withVolumeType("gp2")
.withSizeInGB(size)
)
));
}

--

--