comparison venv/lib/python2.7/site-packages/boto/emr/connection.py @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 # Copyright (c) 2010 Spotify AB
2 # Copyright (c) 2010-2011 Yelp
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
11 #
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22
23 """
24 Represents a connection to the EMR service
25 """
26 import types
27
28 import boto
29 import boto.utils
30 from boto.ec2.regioninfo import RegionInfo
31 from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \
32 Cluster, ClusterSummaryList, HadoopStep, \
33 InstanceGroupList, InstanceList, JobFlow, \
34 JobFlowStepList, \
35 ModifyInstanceGroupsResponse, \
36 RunJobFlowResponse, StepSummaryList
37 from boto.emr.step import JarStep
38 from boto.connection import AWSQueryConnection
39 from boto.exception import EmrResponseError
40 from boto.compat import six
41
42
43 class EmrConnection(AWSQueryConnection):
44
45 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31')
46 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1')
47 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint',
48 'elasticmapreduce.us-east-1.amazonaws.com')
49 ResponseError = EmrResponseError
50
51 # Constants for AWS Console debugging
52 DebuggingJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
53 DebuggingArgs = 's3n://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch'
54
55 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
56 is_secure=True, port=None, proxy=None, proxy_port=None,
57 proxy_user=None, proxy_pass=None, debug=0,
58 https_connection_factory=None, region=None, path='/',
59 security_token=None, validate_certs=True, profile_name=None):
60 if not region:
61 region = RegionInfo(self, self.DefaultRegionName,
62 self.DefaultRegionEndpoint)
63 self.region = region
64 super(EmrConnection, self).__init__(aws_access_key_id,
65 aws_secret_access_key,
66 is_secure, port, proxy, proxy_port,
67 proxy_user, proxy_pass,
68 self.region.endpoint, debug,
69 https_connection_factory, path,
70 security_token,
71 validate_certs=validate_certs,
72 profile_name=profile_name)
73 # Many of the EMR hostnames are of the form:
74 # <region>.<service_name>.amazonaws.com
75 # rather than the more common:
76 # <service_name>.<region>.amazonaws.com
77 # so we need to explicitly set the region_name and service_name
78 # for the SigV4 signing.
79 self.auth_region_name = self.region.name
80 self.auth_service_name = 'elasticmapreduce'
81
82 def _required_auth_capability(self):
83 return ['hmac-v4']
84
85 def describe_cluster(self, cluster_id):
86 """
87 Describes an Elastic MapReduce cluster
88
89 :type cluster_id: str
90 :param cluster_id: The cluster id of interest
91 """
92 params = {
93 'ClusterId': cluster_id
94 }
95 return self.get_object('DescribeCluster', params, Cluster)
96
97 def describe_jobflow(self, jobflow_id):
98 """
99 Describes a single Elastic MapReduce job flow
100
101 :type jobflow_id: str
102 :param jobflow_id: The job flow id of interest
103 """
104 jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id])
105 if jobflows:
106 return jobflows[0]
107
108 def describe_jobflows(self, states=None, jobflow_ids=None,
109 created_after=None, created_before=None):
110 """
111 Retrieve all the Elastic MapReduce job flows on your account
112
113 :type states: list
114 :param states: A list of strings with job flow states wanted
115
116 :type jobflow_ids: list
117 :param jobflow_ids: A list of job flow IDs
118 :type created_after: datetime
119 :param created_after: Bound on job flow creation time
120
121 :type created_before: datetime
122 :param created_before: Bound on job flow creation time
123 """
124 params = {}
125
126 if states:
127 self.build_list_params(params, states, 'JobFlowStates.member')
128 if jobflow_ids:
129 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
130 if created_after:
131 params['CreatedAfter'] = created_after.strftime(
132 boto.utils.ISO8601)
133 if created_before:
134 params['CreatedBefore'] = created_before.strftime(
135 boto.utils.ISO8601)
136
137 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)])
138
139 def describe_step(self, cluster_id, step_id):
140 """
141 Describe an Elastic MapReduce step
142
143 :type cluster_id: str
144 :param cluster_id: The cluster id of interest
145 :type step_id: str
146 :param step_id: The step id of interest
147 """
148 params = {
149 'ClusterId': cluster_id,
150 'StepId': step_id
151 }
152
153 return self.get_object('DescribeStep', params, HadoopStep)
154
155 def list_bootstrap_actions(self, cluster_id, marker=None):
156 """
157 Get a list of bootstrap actions for an Elastic MapReduce cluster
158
159 :type cluster_id: str
160 :param cluster_id: The cluster id of interest
161 :type marker: str
162 :param marker: Pagination marker
163 """
164 params = {
165 'ClusterId': cluster_id
166 }
167
168 if marker:
169 params['Marker'] = marker
170
171 return self.get_object('ListBootstrapActions', params, BootstrapActionList)
172
173 def list_clusters(self, created_after=None, created_before=None,
174 cluster_states=None, marker=None):
175 """
176 List Elastic MapReduce clusters with optional filtering
177
178 :type created_after: datetime
179 :param created_after: Bound on cluster creation time
180 :type created_before: datetime
181 :param created_before: Bound on cluster creation time
182 :type cluster_states: list
183 :param cluster_states: Bound on cluster states
184 :type marker: str
185 :param marker: Pagination marker
186 """
187 params = {}
188 if created_after:
189 params['CreatedAfter'] = created_after.strftime(
190 boto.utils.ISO8601)
191 if created_before:
192 params['CreatedBefore'] = created_before.strftime(
193 boto.utils.ISO8601)
194 if marker:
195 params['Marker'] = marker
196
197 if cluster_states:
198 self.build_list_params(params, cluster_states, 'ClusterStates.member')
199
200 return self.get_object('ListClusters', params, ClusterSummaryList)
201
202 def list_instance_groups(self, cluster_id, marker=None):
203 """
204 List EC2 instance groups in a cluster
205
206 :type cluster_id: str
207 :param cluster_id: The cluster id of interest
208 :type marker: str
209 :param marker: Pagination marker
210 """
211 params = {
212 'ClusterId': cluster_id
213 }
214
215 if marker:
216 params['Marker'] = marker
217
218 return self.get_object('ListInstanceGroups', params, InstanceGroupList)
219
220 def list_instances(self, cluster_id, instance_group_id=None,
221 instance_group_types=None, marker=None):
222 """
223 List EC2 instances in a cluster
224
225 :type cluster_id: str
226 :param cluster_id: The cluster id of interest
227 :type instance_group_id: str
228 :param instance_group_id: The EC2 instance group id of interest
229 :type instance_group_types: list
230 :param instance_group_types: Filter by EC2 instance group type
231 :type marker: str
232 :param marker: Pagination marker
233 """
234 params = {
235 'ClusterId': cluster_id
236 }
237
238 if instance_group_id:
239 params['InstanceGroupId'] = instance_group_id
240 if marker:
241 params['Marker'] = marker
242
243 if instance_group_types:
244 self.build_list_params(params, instance_group_types,
245 'InstanceGroupTypeList.member')
246
247 return self.get_object('ListInstances', params, InstanceList)
248
249 def list_steps(self, cluster_id, step_states=None, marker=None):
250 """
251 List cluster steps
252
253 :type cluster_id: str
254 :param cluster_id: The cluster id of interest
255 :type step_states: list
256 :param step_states: Filter by step states
257 :type marker: str
258 :param marker: Pagination marker
259 """
260 params = {
261 'ClusterId': cluster_id
262 }
263
264 if marker:
265 params['Marker'] = marker
266
267 if step_states:
268 self.build_list_params(params, step_states, 'StepStateList.member')
269
270 return self.get_object('ListSteps', params, StepSummaryList)
271
272 def add_tags(self, resource_id, tags):
273 """
274 Create new metadata tags for the specified resource id.
275
276 :type resource_id: str
277 :param resource_id: The cluster id
278
279 :type tags: dict
280 :param tags: A dictionary containing the name/value pairs.
281 If you want to create only a tag name, the
282 value for that tag should be the empty string
283 (e.g. '') or None.
284 """
285 assert isinstance(resource_id, six.string_types)
286 params = {
287 'ResourceId': resource_id,
288 }
289 params.update(self._build_tag_list(tags))
290 return self.get_status('AddTags', params, verb='POST')
291
292 def remove_tags(self, resource_id, tags):
293 """
294 Remove metadata tags for the specified resource id.
295
296 :type resource_id: str
297 :param resource_id: The cluster id
298
299 :type tags: list
300 :param tags: A list of tag names to remove.
301 """
302 params = {
303 'ResourceId': resource_id,
304 }
305 params.update(self._build_string_list('TagKeys', tags))
306 return self.get_status('RemoveTags', params, verb='POST')
307
308 def terminate_jobflow(self, jobflow_id):
309 """
310 Terminate an Elastic MapReduce job flow
311
312 :type jobflow_id: str
313 :param jobflow_id: A jobflow id
314 """
315 self.terminate_jobflows([jobflow_id])
316
317 def terminate_jobflows(self, jobflow_ids):
318 """
319 Terminate an Elastic MapReduce job flow
320
321 :type jobflow_ids: list
322 :param jobflow_ids: A list of job flow IDs
323 """
324 params = {}
325 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member')
326 return self.get_status('TerminateJobFlows', params, verb='POST')
327
328 def add_jobflow_steps(self, jobflow_id, steps):
329 """
330 Adds steps to a jobflow
331
332 :type jobflow_id: str
333 :param jobflow_id: The job flow id
334 :type steps: list(boto.emr.Step)
335 :param steps: A list of steps to add to the job
336 """
337 if not isinstance(steps, list):
338 steps = [steps]
339 params = {}
340 params['JobFlowId'] = jobflow_id
341
342 # Step args
343 step_args = [self._build_step_args(step) for step in steps]
344 params.update(self._build_step_list(step_args))
345
346 return self.get_object(
347 'AddJobFlowSteps', params, JobFlowStepList, verb='POST')
348
349 def add_instance_groups(self, jobflow_id, instance_groups):
350 """
351 Adds instance groups to a running cluster.
352
353 :type jobflow_id: str
354 :param jobflow_id: The id of the jobflow which will take the
355 new instance groups
356
357 :type instance_groups: list(boto.emr.InstanceGroup)
358 :param instance_groups: A list of instance groups to add to the job
359 """
360 if not isinstance(instance_groups, list):
361 instance_groups = [instance_groups]
362 params = {}
363 params['JobFlowId'] = jobflow_id
364 params.update(self._build_instance_group_list_args(instance_groups))
365
366 return self.get_object('AddInstanceGroups', params,
367 AddInstanceGroupsResponse, verb='POST')
368
369 def modify_instance_groups(self, instance_group_ids, new_sizes):
370 """
371 Modify the number of nodes and configuration settings in an
372 instance group.
373
374 :type instance_group_ids: list(str)
375 :param instance_group_ids: A list of the ID's of the instance
376 groups to be modified
377
378 :type new_sizes: list(int)
379 :param new_sizes: A list of the new sizes for each instance group
380 """
381 if not isinstance(instance_group_ids, list):
382 instance_group_ids = [instance_group_ids]
383 if not isinstance(new_sizes, list):
384 new_sizes = [new_sizes]
385
386 instance_groups = zip(instance_group_ids, new_sizes)
387
388 params = {}
389 for k, ig in enumerate(instance_groups):
390 # could be wrong - the example amazon gives uses
391 # InstanceRequestCount, while the api documentation
392 # says InstanceCount
393 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0]
394 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1]
395
396 return self.get_object('ModifyInstanceGroups', params,
397 ModifyInstanceGroupsResponse, verb='POST')
398
399 def run_jobflow(self, name, log_uri=None, ec2_keyname=None,
400 availability_zone=None,
401 master_instance_type='m1.small',
402 slave_instance_type='m1.small', num_instances=1,
403 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False,
404 enable_debugging=False,
405 hadoop_version=None,
406 steps=[],
407 bootstrap_actions=[],
408 instance_groups=None,
409 additional_info=None,
410 ami_version=None,
411 api_params=None,
412 visible_to_all_users=None,
413 job_flow_role=None,
414 service_role=None):
415 """
416 Runs a job flow
417 :type name: str
418 :param name: Name of the job flow
419
420 :type log_uri: str
421 :param log_uri: URI of the S3 bucket to place logs
422
423 :type ec2_keyname: str
424 :param ec2_keyname: EC2 key used for the instances
425
426 :type availability_zone: str
427 :param availability_zone: EC2 availability zone of the cluster
428
429 :type master_instance_type: str
430 :param master_instance_type: EC2 instance type of the master
431
432 :type slave_instance_type: str
433 :param slave_instance_type: EC2 instance type of the slave nodes
434
435 :type num_instances: int
436 :param num_instances: Number of instances in the Hadoop cluster
437
438 :type action_on_failure: str
439 :param action_on_failure: Action to take if a step terminates
440
441 :type keep_alive: bool
442 :param keep_alive: Denotes whether the cluster should stay
443 alive upon completion
444
445 :type enable_debugging: bool
446 :param enable_debugging: Denotes whether AWS console debugging
447 should be enabled.
448
449 :type hadoop_version: str
450 :param hadoop_version: Version of Hadoop to use. This no longer
451 defaults to '0.20' and now uses the AMI default.
452
453 :type steps: list(boto.emr.Step)
454 :param steps: List of steps to add with the job
455
456 :type bootstrap_actions: list(boto.emr.BootstrapAction)
457 :param bootstrap_actions: List of bootstrap actions that run
458 before Hadoop starts.
459
460 :type instance_groups: list(boto.emr.InstanceGroup)
461 :param instance_groups: Optional list of instance groups to
462 use when creating this job.
463 NB: When provided, this argument supersedes num_instances
464 and master/slave_instance_type.
465
466 :type ami_version: str
467 :param ami_version: Amazon Machine Image (AMI) version to use
468 for instances. Values accepted by EMR are '1.0', '2.0', and
469 'latest'; EMR currently defaults to '1.0' if you don't set
470 'ami_version'.
471
472 :type additional_info: JSON str
473 :param additional_info: A JSON string for selecting additional features
474
475 :type api_params: dict
476 :param api_params: a dictionary of additional parameters to pass
477 directly to the EMR API (so you don't have to upgrade boto to
478 use new EMR features). You can also delete an API parameter
479 by setting it to None.
480
481 :type visible_to_all_users: bool
482 :param visible_to_all_users: Whether the job flow is visible to all IAM
483 users of the AWS account associated with the job flow. If this
484 value is set to ``True``, all IAM users of that AWS
485 account can view and (if they have the proper policy permissions
486 set) manage the job flow. If it is set to ``False``, only
487 the IAM user that created the job flow can view and manage
488 it.
489
490 :type job_flow_role: str
491 :param job_flow_role: An IAM role for the job flow. The EC2
492 instances of the job flow assume this role. The default role is
493 ``EMRJobflowDefault``. In order to use the default role,
494 you must have already created it using the CLI.
495
496 :type service_role: str
497 :param service_role: The IAM role that will be assumed by the Amazon
498 EMR service to access AWS resources on your behalf.
499
500 :rtype: str
501 :return: The jobflow id
502 """
503 params = {}
504 if action_on_failure:
505 params['ActionOnFailure'] = action_on_failure
506 if log_uri:
507 params['LogUri'] = log_uri
508 params['Name'] = name
509
510 # Common instance args
511 common_params = self._build_instance_common_args(ec2_keyname,
512 availability_zone,
513 keep_alive,
514 hadoop_version)
515 params.update(common_params)
516
517 # NB: according to the AWS API's error message, we must
518 # "configure instances either using instance count, master and
519 # slave instance type or instance groups but not both."
520 #
521 # Thus we switch here on the truthiness of instance_groups.
522 if not instance_groups:
523 # Instance args (the common case)
524 instance_params = self._build_instance_count_and_type_args(
525 master_instance_type,
526 slave_instance_type,
527 num_instances)
528 params.update(instance_params)
529 else:
530 # Instance group args (for spot instances or a heterogenous cluster)
531 list_args = self._build_instance_group_list_args(instance_groups)
532 instance_params = dict(
533 ('Instances.%s' % k, v) for k, v in six.iteritems(list_args)
534 )
535 params.update(instance_params)
536
537 # Debugging step from EMR API docs
538 if enable_debugging:
539 debugging_step = JarStep(name='Setup Hadoop Debugging',
540 action_on_failure='TERMINATE_JOB_FLOW',
541 main_class=None,
542 jar=self.DebuggingJar,
543 step_args=self.DebuggingArgs)
544 steps.insert(0, debugging_step)
545
546 # Step args
547 if steps:
548 step_args = [self._build_step_args(step) for step in steps]
549 params.update(self._build_step_list(step_args))
550
551 if bootstrap_actions:
552 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions]
553 params.update(self._build_bootstrap_action_list(bootstrap_action_args))
554
555 if ami_version:
556 params['AmiVersion'] = ami_version
557
558 if additional_info is not None:
559 params['AdditionalInfo'] = additional_info
560
561 if api_params:
562 for key, value in six.iteritems(api_params):
563 if value is None:
564 params.pop(key, None)
565 else:
566 params[key] = value
567
568 if visible_to_all_users is not None:
569 if visible_to_all_users:
570 params['VisibleToAllUsers'] = 'true'
571 else:
572 params['VisibleToAllUsers'] = 'false'
573
574 if job_flow_role is not None:
575 params['JobFlowRole'] = job_flow_role
576
577 if service_role is not None:
578 params['ServiceRole'] = service_role
579
580 response = self.get_object(
581 'RunJobFlow', params, RunJobFlowResponse, verb='POST')
582 return response.jobflowid
583
584 def set_termination_protection(self, jobflow_id,
585 termination_protection_status):
586 """
587 Set termination protection on specified Elastic MapReduce job flows
588
589 :type jobflow_ids: list or str
590 :param jobflow_ids: A list of job flow IDs
591
592 :type termination_protection_status: bool
593 :param termination_protection_status: Termination protection status
594 """
595 assert termination_protection_status in (True, False)
596
597 params = {}
598 params['TerminationProtected'] = (termination_protection_status and "true") or "false"
599 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
600
601 return self.get_status('SetTerminationProtection', params, verb='POST')
602
603 def set_visible_to_all_users(self, jobflow_id, visibility):
604 """
605 Set whether specified Elastic Map Reduce job flows are visible to all IAM users
606
607 :type jobflow_ids: list or str
608 :param jobflow_ids: A list of job flow IDs
609
610 :type visibility: bool
611 :param visibility: Visibility
612 """
613 assert visibility in (True, False)
614
615 params = {}
616 params['VisibleToAllUsers'] = (visibility and "true") or "false"
617 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member')
618
619 return self.get_status('SetVisibleToAllUsers', params, verb='POST')
620
621 def _build_bootstrap_action_args(self, bootstrap_action):
622 bootstrap_action_params = {}
623 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path
624
625 try:
626 bootstrap_action_params['Name'] = bootstrap_action.name
627 except AttributeError:
628 pass
629
630 args = bootstrap_action.args()
631 if args:
632 self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member')
633
634 return bootstrap_action_params
635
636 def _build_step_args(self, step):
637 step_params = {}
638 step_params['ActionOnFailure'] = step.action_on_failure
639 step_params['HadoopJarStep.Jar'] = step.jar()
640
641 main_class = step.main_class()
642 if main_class:
643 step_params['HadoopJarStep.MainClass'] = main_class
644
645 args = step.args()
646 if args:
647 self.build_list_params(step_params, args, 'HadoopJarStep.Args.member')
648
649 step_params['Name'] = step.name
650 return step_params
651
652 def _build_bootstrap_action_list(self, bootstrap_actions):
653 if not isinstance(bootstrap_actions, list):
654 bootstrap_actions = [bootstrap_actions]
655
656 params = {}
657 for i, bootstrap_action in enumerate(bootstrap_actions):
658 for key, value in six.iteritems(bootstrap_action):
659 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value
660 return params
661
662 def _build_step_list(self, steps):
663 if not isinstance(steps, list):
664 steps = [steps]
665
666 params = {}
667 for i, step in enumerate(steps):
668 for key, value in six.iteritems(step):
669 params['Steps.member.%s.%s' % (i+1, key)] = value
670 return params
671
672 def _build_string_list(self, field, items):
673 if not isinstance(items, list):
674 items = [items]
675
676 params = {}
677 for i, item in enumerate(items):
678 params['%s.member.%s' % (field, i + 1)] = item
679 return params
680
681 def _build_tag_list(self, tags):
682 assert isinstance(tags, dict)
683
684 params = {}
685 for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1):
686 key, value = key_value
687 current_prefix = 'Tags.member.%s' % i
688 params['%s.Key' % current_prefix] = key
689 if value:
690 params['%s.Value' % current_prefix] = value
691 return params
692
693 def _build_instance_common_args(self, ec2_keyname, availability_zone,
694 keep_alive, hadoop_version):
695 """
696 Takes a number of parameters used when starting a jobflow (as
697 specified in run_jobflow() above). Returns a comparable dict for
698 use in making a RunJobFlow request.
699 """
700 params = {
701 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(),
702 }
703
704 if hadoop_version:
705 params['Instances.HadoopVersion'] = hadoop_version
706 if ec2_keyname:
707 params['Instances.Ec2KeyName'] = ec2_keyname
708 if availability_zone:
709 params['Instances.Placement.AvailabilityZone'] = availability_zone
710
711 return params
712
713 def _build_instance_count_and_type_args(self, master_instance_type,
714 slave_instance_type, num_instances):
715 """
716 Takes a master instance type (string), a slave instance type
717 (string), and a number of instances. Returns a comparable dict
718 for use in making a RunJobFlow request.
719 """
720 params = {'Instances.MasterInstanceType': master_instance_type,
721 'Instances.SlaveInstanceType': slave_instance_type,
722 'Instances.InstanceCount': num_instances}
723 return params
724
725 def _build_instance_group_args(self, instance_group):
726 """
727 Takes an InstanceGroup; returns a dict that, when its keys are
728 properly prefixed, can be used for describing InstanceGroups in
729 RunJobFlow or AddInstanceGroups requests.
730 """
731 params = {'InstanceCount': instance_group.num_instances,
732 'InstanceRole': instance_group.role,
733 'InstanceType': instance_group.type,
734 'Name': instance_group.name,
735 'Market': instance_group.market}
736 if instance_group.market == 'SPOT':
737 params['BidPrice'] = instance_group.bidprice
738 return params
739
740 def _build_instance_group_list_args(self, instance_groups):
741 """
742 Takes a list of InstanceGroups, or a single InstanceGroup. Returns
743 a comparable dict for use in making a RunJobFlow or AddInstanceGroups
744 request.
745 """
746 if not isinstance(instance_groups, list):
747 instance_groups = [instance_groups]
748
749 params = {}
750 for i, instance_group in enumerate(instance_groups):
751 ig_dict = self._build_instance_group_args(instance_group)
752 for key, value in six.iteritems(ig_dict):
753 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
754 return params