Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/boto/emr/connection.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 # Copyright (c) 2010 Spotify AB | |
2 # Copyright (c) 2010-2011 Yelp | |
3 # | |
4 # Permission is hereby granted, free of charge, to any person obtaining a | |
5 # copy of this software and associated documentation files (the | |
6 # "Software"), to deal in the Software without restriction, including | |
7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
9 # persons to whom the Software is furnished to do so, subject to the fol- | |
10 # lowing conditions: | |
11 # | |
12 # The above copyright notice and this permission notice shall be included | |
13 # in all copies or substantial portions of the Software. | |
14 # | |
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
21 # IN THE SOFTWARE. | |
22 | |
23 """ | |
24 Represents a connection to the EMR service | |
25 """ | |
26 import types | |
27 | |
28 import boto | |
29 import boto.utils | |
30 from boto.ec2.regioninfo import RegionInfo | |
31 from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \ | |
32 Cluster, ClusterSummaryList, HadoopStep, \ | |
33 InstanceGroupList, InstanceList, JobFlow, \ | |
34 JobFlowStepList, \ | |
35 ModifyInstanceGroupsResponse, \ | |
36 RunJobFlowResponse, StepSummaryList | |
37 from boto.emr.step import JarStep | |
38 from boto.connection import AWSQueryConnection | |
39 from boto.exception import EmrResponseError | |
40 from boto.compat import six | |
41 | |
42 | |
43 class EmrConnection(AWSQueryConnection): | |
44 | |
45 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') | |
46 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') | |
47 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', | |
48 'elasticmapreduce.us-east-1.amazonaws.com') | |
49 ResponseError = EmrResponseError | |
50 | |
51 # Constants for AWS Console debugging | |
52 DebuggingJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar' | |
53 DebuggingArgs = 's3n://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch' | |
54 | |
55 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, | |
56 is_secure=True, port=None, proxy=None, proxy_port=None, | |
57 proxy_user=None, proxy_pass=None, debug=0, | |
58 https_connection_factory=None, region=None, path='/', | |
59 security_token=None, validate_certs=True, profile_name=None): | |
60 if not region: | |
61 region = RegionInfo(self, self.DefaultRegionName, | |
62 self.DefaultRegionEndpoint) | |
63 self.region = region | |
64 super(EmrConnection, self).__init__(aws_access_key_id, | |
65 aws_secret_access_key, | |
66 is_secure, port, proxy, proxy_port, | |
67 proxy_user, proxy_pass, | |
68 self.region.endpoint, debug, | |
69 https_connection_factory, path, | |
70 security_token, | |
71 validate_certs=validate_certs, | |
72 profile_name=profile_name) | |
73 # Many of the EMR hostnames are of the form: | |
74 # <region>.<service_name>.amazonaws.com | |
75 # rather than the more common: | |
76 # <service_name>.<region>.amazonaws.com | |
77 # so we need to explicitly set the region_name and service_name | |
78 # for the SigV4 signing. | |
79 self.auth_region_name = self.region.name | |
80 self.auth_service_name = 'elasticmapreduce' | |
81 | |
82 def _required_auth_capability(self): | |
83 return ['hmac-v4'] | |
84 | |
85 def describe_cluster(self, cluster_id): | |
86 """ | |
87 Describes an Elastic MapReduce cluster | |
88 | |
89 :type cluster_id: str | |
90 :param cluster_id: The cluster id of interest | |
91 """ | |
92 params = { | |
93 'ClusterId': cluster_id | |
94 } | |
95 return self.get_object('DescribeCluster', params, Cluster) | |
96 | |
97 def describe_jobflow(self, jobflow_id): | |
98 """ | |
99 Describes a single Elastic MapReduce job flow | |
100 | |
101 :type jobflow_id: str | |
102 :param jobflow_id: The job flow id of interest | |
103 """ | |
104 jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id]) | |
105 if jobflows: | |
106 return jobflows[0] | |
107 | |
108 def describe_jobflows(self, states=None, jobflow_ids=None, | |
109 created_after=None, created_before=None): | |
110 """ | |
111 Retrieve all the Elastic MapReduce job flows on your account | |
112 | |
113 :type states: list | |
114 :param states: A list of strings with job flow states wanted | |
115 | |
116 :type jobflow_ids: list | |
117 :param jobflow_ids: A list of job flow IDs | |
118 :type created_after: datetime | |
119 :param created_after: Bound on job flow creation time | |
120 | |
121 :type created_before: datetime | |
122 :param created_before: Bound on job flow creation time | |
123 """ | |
124 params = {} | |
125 | |
126 if states: | |
127 self.build_list_params(params, states, 'JobFlowStates.member') | |
128 if jobflow_ids: | |
129 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | |
130 if created_after: | |
131 params['CreatedAfter'] = created_after.strftime( | |
132 boto.utils.ISO8601) | |
133 if created_before: | |
134 params['CreatedBefore'] = created_before.strftime( | |
135 boto.utils.ISO8601) | |
136 | |
137 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) | |
138 | |
139 def describe_step(self, cluster_id, step_id): | |
140 """ | |
141 Describe an Elastic MapReduce step | |
142 | |
143 :type cluster_id: str | |
144 :param cluster_id: The cluster id of interest | |
145 :type step_id: str | |
146 :param step_id: The step id of interest | |
147 """ | |
148 params = { | |
149 'ClusterId': cluster_id, | |
150 'StepId': step_id | |
151 } | |
152 | |
153 return self.get_object('DescribeStep', params, HadoopStep) | |
154 | |
155 def list_bootstrap_actions(self, cluster_id, marker=None): | |
156 """ | |
157 Get a list of bootstrap actions for an Elastic MapReduce cluster | |
158 | |
159 :type cluster_id: str | |
160 :param cluster_id: The cluster id of interest | |
161 :type marker: str | |
162 :param marker: Pagination marker | |
163 """ | |
164 params = { | |
165 'ClusterId': cluster_id | |
166 } | |
167 | |
168 if marker: | |
169 params['Marker'] = marker | |
170 | |
171 return self.get_object('ListBootstrapActions', params, BootstrapActionList) | |
172 | |
173 def list_clusters(self, created_after=None, created_before=None, | |
174 cluster_states=None, marker=None): | |
175 """ | |
176 List Elastic MapReduce clusters with optional filtering | |
177 | |
178 :type created_after: datetime | |
179 :param created_after: Bound on cluster creation time | |
180 :type created_before: datetime | |
181 :param created_before: Bound on cluster creation time | |
182 :type cluster_states: list | |
183 :param cluster_states: Bound on cluster states | |
184 :type marker: str | |
185 :param marker: Pagination marker | |
186 """ | |
187 params = {} | |
188 if created_after: | |
189 params['CreatedAfter'] = created_after.strftime( | |
190 boto.utils.ISO8601) | |
191 if created_before: | |
192 params['CreatedBefore'] = created_before.strftime( | |
193 boto.utils.ISO8601) | |
194 if marker: | |
195 params['Marker'] = marker | |
196 | |
197 if cluster_states: | |
198 self.build_list_params(params, cluster_states, 'ClusterStates.member') | |
199 | |
200 return self.get_object('ListClusters', params, ClusterSummaryList) | |
201 | |
202 def list_instance_groups(self, cluster_id, marker=None): | |
203 """ | |
204 List EC2 instance groups in a cluster | |
205 | |
206 :type cluster_id: str | |
207 :param cluster_id: The cluster id of interest | |
208 :type marker: str | |
209 :param marker: Pagination marker | |
210 """ | |
211 params = { | |
212 'ClusterId': cluster_id | |
213 } | |
214 | |
215 if marker: | |
216 params['Marker'] = marker | |
217 | |
218 return self.get_object('ListInstanceGroups', params, InstanceGroupList) | |
219 | |
220 def list_instances(self, cluster_id, instance_group_id=None, | |
221 instance_group_types=None, marker=None): | |
222 """ | |
223 List EC2 instances in a cluster | |
224 | |
225 :type cluster_id: str | |
226 :param cluster_id: The cluster id of interest | |
227 :type instance_group_id: str | |
228 :param instance_group_id: The EC2 instance group id of interest | |
229 :type instance_group_types: list | |
230 :param instance_group_types: Filter by EC2 instance group type | |
231 :type marker: str | |
232 :param marker: Pagination marker | |
233 """ | |
234 params = { | |
235 'ClusterId': cluster_id | |
236 } | |
237 | |
238 if instance_group_id: | |
239 params['InstanceGroupId'] = instance_group_id | |
240 if marker: | |
241 params['Marker'] = marker | |
242 | |
243 if instance_group_types: | |
244 self.build_list_params(params, instance_group_types, | |
245 'InstanceGroupTypeList.member') | |
246 | |
247 return self.get_object('ListInstances', params, InstanceList) | |
248 | |
249 def list_steps(self, cluster_id, step_states=None, marker=None): | |
250 """ | |
251 List cluster steps | |
252 | |
253 :type cluster_id: str | |
254 :param cluster_id: The cluster id of interest | |
255 :type step_states: list | |
256 :param step_states: Filter by step states | |
257 :type marker: str | |
258 :param marker: Pagination marker | |
259 """ | |
260 params = { | |
261 'ClusterId': cluster_id | |
262 } | |
263 | |
264 if marker: | |
265 params['Marker'] = marker | |
266 | |
267 if step_states: | |
268 self.build_list_params(params, step_states, 'StepStateList.member') | |
269 | |
270 return self.get_object('ListSteps', params, StepSummaryList) | |
271 | |
272 def add_tags(self, resource_id, tags): | |
273 """ | |
274 Create new metadata tags for the specified resource id. | |
275 | |
276 :type resource_id: str | |
277 :param resource_id: The cluster id | |
278 | |
279 :type tags: dict | |
280 :param tags: A dictionary containing the name/value pairs. | |
281 If you want to create only a tag name, the | |
282 value for that tag should be the empty string | |
283 (e.g. '') or None. | |
284 """ | |
285 assert isinstance(resource_id, six.string_types) | |
286 params = { | |
287 'ResourceId': resource_id, | |
288 } | |
289 params.update(self._build_tag_list(tags)) | |
290 return self.get_status('AddTags', params, verb='POST') | |
291 | |
292 def remove_tags(self, resource_id, tags): | |
293 """ | |
294 Remove metadata tags for the specified resource id. | |
295 | |
296 :type resource_id: str | |
297 :param resource_id: The cluster id | |
298 | |
299 :type tags: list | |
300 :param tags: A list of tag names to remove. | |
301 """ | |
302 params = { | |
303 'ResourceId': resource_id, | |
304 } | |
305 params.update(self._build_string_list('TagKeys', tags)) | |
306 return self.get_status('RemoveTags', params, verb='POST') | |
307 | |
308 def terminate_jobflow(self, jobflow_id): | |
309 """ | |
310 Terminate an Elastic MapReduce job flow | |
311 | |
312 :type jobflow_id: str | |
313 :param jobflow_id: A jobflow id | |
314 """ | |
315 self.terminate_jobflows([jobflow_id]) | |
316 | |
317 def terminate_jobflows(self, jobflow_ids): | |
318 """ | |
319 Terminate an Elastic MapReduce job flow | |
320 | |
321 :type jobflow_ids: list | |
322 :param jobflow_ids: A list of job flow IDs | |
323 """ | |
324 params = {} | |
325 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | |
326 return self.get_status('TerminateJobFlows', params, verb='POST') | |
327 | |
328 def add_jobflow_steps(self, jobflow_id, steps): | |
329 """ | |
330 Adds steps to a jobflow | |
331 | |
332 :type jobflow_id: str | |
333 :param jobflow_id: The job flow id | |
334 :type steps: list(boto.emr.Step) | |
335 :param steps: A list of steps to add to the job | |
336 """ | |
337 if not isinstance(steps, list): | |
338 steps = [steps] | |
339 params = {} | |
340 params['JobFlowId'] = jobflow_id | |
341 | |
342 # Step args | |
343 step_args = [self._build_step_args(step) for step in steps] | |
344 params.update(self._build_step_list(step_args)) | |
345 | |
346 return self.get_object( | |
347 'AddJobFlowSteps', params, JobFlowStepList, verb='POST') | |
348 | |
349 def add_instance_groups(self, jobflow_id, instance_groups): | |
350 """ | |
351 Adds instance groups to a running cluster. | |
352 | |
353 :type jobflow_id: str | |
354 :param jobflow_id: The id of the jobflow which will take the | |
355 new instance groups | |
356 | |
357 :type instance_groups: list(boto.emr.InstanceGroup) | |
358 :param instance_groups: A list of instance groups to add to the job | |
359 """ | |
360 if not isinstance(instance_groups, list): | |
361 instance_groups = [instance_groups] | |
362 params = {} | |
363 params['JobFlowId'] = jobflow_id | |
364 params.update(self._build_instance_group_list_args(instance_groups)) | |
365 | |
366 return self.get_object('AddInstanceGroups', params, | |
367 AddInstanceGroupsResponse, verb='POST') | |
368 | |
369 def modify_instance_groups(self, instance_group_ids, new_sizes): | |
370 """ | |
371 Modify the number of nodes and configuration settings in an | |
372 instance group. | |
373 | |
374 :type instance_group_ids: list(str) | |
375 :param instance_group_ids: A list of the ID's of the instance | |
376 groups to be modified | |
377 | |
378 :type new_sizes: list(int) | |
379 :param new_sizes: A list of the new sizes for each instance group | |
380 """ | |
381 if not isinstance(instance_group_ids, list): | |
382 instance_group_ids = [instance_group_ids] | |
383 if not isinstance(new_sizes, list): | |
384 new_sizes = [new_sizes] | |
385 | |
386 instance_groups = zip(instance_group_ids, new_sizes) | |
387 | |
388 params = {} | |
389 for k, ig in enumerate(instance_groups): | |
390 # could be wrong - the example amazon gives uses | |
391 # InstanceRequestCount, while the api documentation | |
392 # says InstanceCount | |
393 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0] | |
394 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1] | |
395 | |
396 return self.get_object('ModifyInstanceGroups', params, | |
397 ModifyInstanceGroupsResponse, verb='POST') | |
398 | |
399 def run_jobflow(self, name, log_uri=None, ec2_keyname=None, | |
400 availability_zone=None, | |
401 master_instance_type='m1.small', | |
402 slave_instance_type='m1.small', num_instances=1, | |
403 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, | |
404 enable_debugging=False, | |
405 hadoop_version=None, | |
406 steps=[], | |
407 bootstrap_actions=[], | |
408 instance_groups=None, | |
409 additional_info=None, | |
410 ami_version=None, | |
411 api_params=None, | |
412 visible_to_all_users=None, | |
413 job_flow_role=None, | |
414 service_role=None): | |
415 """ | |
416 Runs a job flow | |
417 :type name: str | |
418 :param name: Name of the job flow | |
419 | |
420 :type log_uri: str | |
421 :param log_uri: URI of the S3 bucket to place logs | |
422 | |
423 :type ec2_keyname: str | |
424 :param ec2_keyname: EC2 key used for the instances | |
425 | |
426 :type availability_zone: str | |
427 :param availability_zone: EC2 availability zone of the cluster | |
428 | |
429 :type master_instance_type: str | |
430 :param master_instance_type: EC2 instance type of the master | |
431 | |
432 :type slave_instance_type: str | |
433 :param slave_instance_type: EC2 instance type of the slave nodes | |
434 | |
435 :type num_instances: int | |
436 :param num_instances: Number of instances in the Hadoop cluster | |
437 | |
438 :type action_on_failure: str | |
439 :param action_on_failure: Action to take if a step terminates | |
440 | |
441 :type keep_alive: bool | |
442 :param keep_alive: Denotes whether the cluster should stay | |
443 alive upon completion | |
444 | |
445 :type enable_debugging: bool | |
446 :param enable_debugging: Denotes whether AWS console debugging | |
447 should be enabled. | |
448 | |
449 :type hadoop_version: str | |
450 :param hadoop_version: Version of Hadoop to use. This no longer | |
451 defaults to '0.20' and now uses the AMI default. | |
452 | |
453 :type steps: list(boto.emr.Step) | |
454 :param steps: List of steps to add with the job | |
455 | |
456 :type bootstrap_actions: list(boto.emr.BootstrapAction) | |
457 :param bootstrap_actions: List of bootstrap actions that run | |
458 before Hadoop starts. | |
459 | |
460 :type instance_groups: list(boto.emr.InstanceGroup) | |
461 :param instance_groups: Optional list of instance groups to | |
462 use when creating this job. | |
463 NB: When provided, this argument supersedes num_instances | |
464 and master/slave_instance_type. | |
465 | |
466 :type ami_version: str | |
467 :param ami_version: Amazon Machine Image (AMI) version to use | |
468 for instances. Values accepted by EMR are '1.0', '2.0', and | |
469 'latest'; EMR currently defaults to '1.0' if you don't set | |
470 'ami_version'. | |
471 | |
472 :type additional_info: JSON str | |
473 :param additional_info: A JSON string for selecting additional features | |
474 | |
475 :type api_params: dict | |
476 :param api_params: a dictionary of additional parameters to pass | |
477 directly to the EMR API (so you don't have to upgrade boto to | |
478 use new EMR features). You can also delete an API parameter | |
479 by setting it to None. | |
480 | |
481 :type visible_to_all_users: bool | |
482 :param visible_to_all_users: Whether the job flow is visible to all IAM | |
483 users of the AWS account associated with the job flow. If this | |
484 value is set to ``True``, all IAM users of that AWS | |
485 account can view and (if they have the proper policy permissions | |
486 set) manage the job flow. If it is set to ``False``, only | |
487 the IAM user that created the job flow can view and manage | |
488 it. | |
489 | |
490 :type job_flow_role: str | |
491 :param job_flow_role: An IAM role for the job flow. The EC2 | |
492 instances of the job flow assume this role. The default role is | |
493 ``EMRJobflowDefault``. In order to use the default role, | |
494 you must have already created it using the CLI. | |
495 | |
496 :type service_role: str | |
497 :param service_role: The IAM role that will be assumed by the Amazon | |
498 EMR service to access AWS resources on your behalf. | |
499 | |
500 :rtype: str | |
501 :return: The jobflow id | |
502 """ | |
503 params = {} | |
504 if action_on_failure: | |
505 params['ActionOnFailure'] = action_on_failure | |
506 if log_uri: | |
507 params['LogUri'] = log_uri | |
508 params['Name'] = name | |
509 | |
510 # Common instance args | |
511 common_params = self._build_instance_common_args(ec2_keyname, | |
512 availability_zone, | |
513 keep_alive, | |
514 hadoop_version) | |
515 params.update(common_params) | |
516 | |
517 # NB: according to the AWS API's error message, we must | |
518 # "configure instances either using instance count, master and | |
519 # slave instance type or instance groups but not both." | |
520 # | |
521 # Thus we switch here on the truthiness of instance_groups. | |
522 if not instance_groups: | |
523 # Instance args (the common case) | |
524 instance_params = self._build_instance_count_and_type_args( | |
525 master_instance_type, | |
526 slave_instance_type, | |
527 num_instances) | |
528 params.update(instance_params) | |
529 else: | |
530 # Instance group args (for spot instances or a heterogenous cluster) | |
531 list_args = self._build_instance_group_list_args(instance_groups) | |
532 instance_params = dict( | |
533 ('Instances.%s' % k, v) for k, v in six.iteritems(list_args) | |
534 ) | |
535 params.update(instance_params) | |
536 | |
537 # Debugging step from EMR API docs | |
538 if enable_debugging: | |
539 debugging_step = JarStep(name='Setup Hadoop Debugging', | |
540 action_on_failure='TERMINATE_JOB_FLOW', | |
541 main_class=None, | |
542 jar=self.DebuggingJar, | |
543 step_args=self.DebuggingArgs) | |
544 steps.insert(0, debugging_step) | |
545 | |
546 # Step args | |
547 if steps: | |
548 step_args = [self._build_step_args(step) for step in steps] | |
549 params.update(self._build_step_list(step_args)) | |
550 | |
551 if bootstrap_actions: | |
552 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions] | |
553 params.update(self._build_bootstrap_action_list(bootstrap_action_args)) | |
554 | |
555 if ami_version: | |
556 params['AmiVersion'] = ami_version | |
557 | |
558 if additional_info is not None: | |
559 params['AdditionalInfo'] = additional_info | |
560 | |
561 if api_params: | |
562 for key, value in six.iteritems(api_params): | |
563 if value is None: | |
564 params.pop(key, None) | |
565 else: | |
566 params[key] = value | |
567 | |
568 if visible_to_all_users is not None: | |
569 if visible_to_all_users: | |
570 params['VisibleToAllUsers'] = 'true' | |
571 else: | |
572 params['VisibleToAllUsers'] = 'false' | |
573 | |
574 if job_flow_role is not None: | |
575 params['JobFlowRole'] = job_flow_role | |
576 | |
577 if service_role is not None: | |
578 params['ServiceRole'] = service_role | |
579 | |
580 response = self.get_object( | |
581 'RunJobFlow', params, RunJobFlowResponse, verb='POST') | |
582 return response.jobflowid | |
583 | |
584 def set_termination_protection(self, jobflow_id, | |
585 termination_protection_status): | |
586 """ | |
587 Set termination protection on specified Elastic MapReduce job flows | |
588 | |
589 :type jobflow_ids: list or str | |
590 :param jobflow_ids: A list of job flow IDs | |
591 | |
592 :type termination_protection_status: bool | |
593 :param termination_protection_status: Termination protection status | |
594 """ | |
595 assert termination_protection_status in (True, False) | |
596 | |
597 params = {} | |
598 params['TerminationProtected'] = (termination_protection_status and "true") or "false" | |
599 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') | |
600 | |
601 return self.get_status('SetTerminationProtection', params, verb='POST') | |
602 | |
603 def set_visible_to_all_users(self, jobflow_id, visibility): | |
604 """ | |
605 Set whether specified Elastic Map Reduce job flows are visible to all IAM users | |
606 | |
607 :type jobflow_ids: list or str | |
608 :param jobflow_ids: A list of job flow IDs | |
609 | |
610 :type visibility: bool | |
611 :param visibility: Visibility | |
612 """ | |
613 assert visibility in (True, False) | |
614 | |
615 params = {} | |
616 params['VisibleToAllUsers'] = (visibility and "true") or "false" | |
617 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') | |
618 | |
619 return self.get_status('SetVisibleToAllUsers', params, verb='POST') | |
620 | |
621 def _build_bootstrap_action_args(self, bootstrap_action): | |
622 bootstrap_action_params = {} | |
623 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path | |
624 | |
625 try: | |
626 bootstrap_action_params['Name'] = bootstrap_action.name | |
627 except AttributeError: | |
628 pass | |
629 | |
630 args = bootstrap_action.args() | |
631 if args: | |
632 self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member') | |
633 | |
634 return bootstrap_action_params | |
635 | |
636 def _build_step_args(self, step): | |
637 step_params = {} | |
638 step_params['ActionOnFailure'] = step.action_on_failure | |
639 step_params['HadoopJarStep.Jar'] = step.jar() | |
640 | |
641 main_class = step.main_class() | |
642 if main_class: | |
643 step_params['HadoopJarStep.MainClass'] = main_class | |
644 | |
645 args = step.args() | |
646 if args: | |
647 self.build_list_params(step_params, args, 'HadoopJarStep.Args.member') | |
648 | |
649 step_params['Name'] = step.name | |
650 return step_params | |
651 | |
652 def _build_bootstrap_action_list(self, bootstrap_actions): | |
653 if not isinstance(bootstrap_actions, list): | |
654 bootstrap_actions = [bootstrap_actions] | |
655 | |
656 params = {} | |
657 for i, bootstrap_action in enumerate(bootstrap_actions): | |
658 for key, value in six.iteritems(bootstrap_action): | |
659 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value | |
660 return params | |
661 | |
662 def _build_step_list(self, steps): | |
663 if not isinstance(steps, list): | |
664 steps = [steps] | |
665 | |
666 params = {} | |
667 for i, step in enumerate(steps): | |
668 for key, value in six.iteritems(step): | |
669 params['Steps.member.%s.%s' % (i+1, key)] = value | |
670 return params | |
671 | |
672 def _build_string_list(self, field, items): | |
673 if not isinstance(items, list): | |
674 items = [items] | |
675 | |
676 params = {} | |
677 for i, item in enumerate(items): | |
678 params['%s.member.%s' % (field, i + 1)] = item | |
679 return params | |
680 | |
681 def _build_tag_list(self, tags): | |
682 assert isinstance(tags, dict) | |
683 | |
684 params = {} | |
685 for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1): | |
686 key, value = key_value | |
687 current_prefix = 'Tags.member.%s' % i | |
688 params['%s.Key' % current_prefix] = key | |
689 if value: | |
690 params['%s.Value' % current_prefix] = value | |
691 return params | |
692 | |
693 def _build_instance_common_args(self, ec2_keyname, availability_zone, | |
694 keep_alive, hadoop_version): | |
695 """ | |
696 Takes a number of parameters used when starting a jobflow (as | |
697 specified in run_jobflow() above). Returns a comparable dict for | |
698 use in making a RunJobFlow request. | |
699 """ | |
700 params = { | |
701 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(), | |
702 } | |
703 | |
704 if hadoop_version: | |
705 params['Instances.HadoopVersion'] = hadoop_version | |
706 if ec2_keyname: | |
707 params['Instances.Ec2KeyName'] = ec2_keyname | |
708 if availability_zone: | |
709 params['Instances.Placement.AvailabilityZone'] = availability_zone | |
710 | |
711 return params | |
712 | |
713 def _build_instance_count_and_type_args(self, master_instance_type, | |
714 slave_instance_type, num_instances): | |
715 """ | |
716 Takes a master instance type (string), a slave instance type | |
717 (string), and a number of instances. Returns a comparable dict | |
718 for use in making a RunJobFlow request. | |
719 """ | |
720 params = {'Instances.MasterInstanceType': master_instance_type, | |
721 'Instances.SlaveInstanceType': slave_instance_type, | |
722 'Instances.InstanceCount': num_instances} | |
723 return params | |
724 | |
725 def _build_instance_group_args(self, instance_group): | |
726 """ | |
727 Takes an InstanceGroup; returns a dict that, when its keys are | |
728 properly prefixed, can be used for describing InstanceGroups in | |
729 RunJobFlow or AddInstanceGroups requests. | |
730 """ | |
731 params = {'InstanceCount': instance_group.num_instances, | |
732 'InstanceRole': instance_group.role, | |
733 'InstanceType': instance_group.type, | |
734 'Name': instance_group.name, | |
735 'Market': instance_group.market} | |
736 if instance_group.market == 'SPOT': | |
737 params['BidPrice'] = instance_group.bidprice | |
738 return params | |
739 | |
740 def _build_instance_group_list_args(self, instance_groups): | |
741 """ | |
742 Takes a list of InstanceGroups, or a single InstanceGroup. Returns | |
743 a comparable dict for use in making a RunJobFlow or AddInstanceGroups | |
744 request. | |
745 """ | |
746 if not isinstance(instance_groups, list): | |
747 instance_groups = [instance_groups] | |
748 | |
749 params = {} | |
750 for i, instance_group in enumerate(instance_groups): | |
751 ig_dict = self._build_instance_group_args(instance_group) | |
752 for key, value in six.iteritems(ig_dict): | |
753 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value | |
754 return params |