Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/boto/dynamodb2/table.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
| author | bcclaywell |
|---|---|
| date | Mon, 12 Oct 2015 17:43:33 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d67268158946 |
|---|---|
| 1 import boto | |
| 2 from boto.dynamodb2 import exceptions | |
| 3 from boto.dynamodb2.fields import (HashKey, RangeKey, | |
| 4 AllIndex, KeysOnlyIndex, IncludeIndex, | |
| 5 GlobalAllIndex, GlobalKeysOnlyIndex, | |
| 6 GlobalIncludeIndex) | |
| 7 from boto.dynamodb2.items import Item | |
| 8 from boto.dynamodb2.layer1 import DynamoDBConnection | |
| 9 from boto.dynamodb2.results import ResultSet, BatchGetResultSet | |
| 10 from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS, | |
| 11 QUERY_OPERATORS, STRING) | |
| 12 from boto.exception import JSONResponseError | |
| 13 | |
| 14 | |
| 15 class Table(object): | |
| 16 """ | |
| 17 Interacts & models the behavior of a DynamoDB table. | |
| 18 | |
| 19 The ``Table`` object represents a set (or rough categorization) of | |
| 20 records within DynamoDB. The important part is that all records within the | |
| 21 table, while largely-schema-free, share the same schema & are essentially | |
| 22 namespaced for use in your application. For example, you might have a | |
| 23 ``users`` table or a ``forums`` table. | |
| 24 """ | |
| 25 max_batch_get = 100 | |
| 26 | |
| 27 _PROJECTION_TYPE_TO_INDEX = dict( | |
| 28 global_indexes=dict( | |
| 29 ALL=GlobalAllIndex, | |
| 30 KEYS_ONLY=GlobalKeysOnlyIndex, | |
| 31 INCLUDE=GlobalIncludeIndex, | |
| 32 ), local_indexes=dict( | |
| 33 ALL=AllIndex, | |
| 34 KEYS_ONLY=KeysOnlyIndex, | |
| 35 INCLUDE=IncludeIndex, | |
| 36 ) | |
| 37 ) | |
| 38 | |
| 39 def __init__(self, table_name, schema=None, throughput=None, indexes=None, | |
| 40 global_indexes=None, connection=None): | |
| 41 """ | |
| 42 Sets up a new in-memory ``Table``. | |
| 43 | |
| 44 This is useful if the table already exists within DynamoDB & you simply | |
| 45 want to use it for additional interactions. The only required parameter | |
| 46 is the ``table_name``. However, under the hood, the object will call | |
| 47 ``describe_table`` to determine the schema/indexes/throughput. You | |
| 48 can avoid this extra call by passing in ``schema`` & ``indexes``. | |
| 49 | |
| 50 **IMPORTANT** - If you're creating a new ``Table`` for the first time, | |
| 51 you should use the ``Table.create`` method instead, as it will | |
| 52 persist the table structure to DynamoDB. | |
| 53 | |
| 54 Requires a ``table_name`` parameter, which should be a simple string | |
| 55 of the name of the table. | |
| 56 | |
| 57 Optionally accepts a ``schema`` parameter, which should be a list of | |
| 58 ``BaseSchemaField`` subclasses representing the desired schema. | |
| 59 | |
| 60 Optionally accepts a ``throughput`` parameter, which should be a | |
| 61 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
| 62 both of which should have an integer value associated with them. | |
| 63 | |
| 64 Optionally accepts a ``indexes`` parameter, which should be a list of | |
| 65 ``BaseIndexField`` subclasses representing the desired indexes. | |
| 66 | |
| 67 Optionally accepts a ``global_indexes`` parameter, which should be a | |
| 68 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
| 69 indexes. | |
| 70 | |
| 71 Optionally accepts a ``connection`` parameter, which should be a | |
| 72 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
| 73 for specifying alternate connection parameters. | |
| 74 | |
| 75 Example:: | |
| 76 | |
| 77 # The simple, it-already-exists case. | |
| 78 >>> conn = Table('users') | |
| 79 | |
| 80 # The full, minimum-extra-calls case. | |
| 81 >>> from boto import dynamodb2 | |
| 82 >>> users = Table('users', schema=[ | |
| 83 ... HashKey('username'), | |
| 84 ... RangeKey('date_joined', data_type=NUMBER) | |
| 85 ... ], throughput={ | |
| 86 ... 'read':20, | |
| 87 ... 'write': 10, | |
| 88 ... }, indexes=[ | |
| 89 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
| 90 ... HashKey('username') | |
| 91 ... RangeKey('date_joined') | |
| 92 ... ]), | |
| 93 ... ], global_indexes=[ | |
| 94 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
| 95 ... HashKey('zipcode'), | |
| 96 ... RangeKey('username'), | |
| 97 ... ], | |
| 98 ... throughput={ | |
| 99 ... 'read':10, | |
| 100 ... 'write":10, | |
| 101 ... }), | |
| 102 ... ], connection=dynamodb2.connect_to_region('us-west-2', | |
| 103 ... aws_access_key_id='key', | |
| 104 ... aws_secret_access_key='key', | |
| 105 ... )) | |
| 106 | |
| 107 """ | |
| 108 self.table_name = table_name | |
| 109 self.connection = connection | |
| 110 self.throughput = { | |
| 111 'read': 5, | |
| 112 'write': 5, | |
| 113 } | |
| 114 self.schema = schema | |
| 115 self.indexes = indexes | |
| 116 self.global_indexes = global_indexes | |
| 117 | |
| 118 if self.connection is None: | |
| 119 self.connection = DynamoDBConnection() | |
| 120 | |
| 121 if throughput is not None: | |
| 122 self.throughput = throughput | |
| 123 | |
| 124 self._dynamizer = NonBooleanDynamizer() | |
| 125 | |
| 126 def use_boolean(self): | |
| 127 self._dynamizer = Dynamizer() | |
| 128 | |
| 129 @classmethod | |
| 130 def create(cls, table_name, schema, throughput=None, indexes=None, | |
| 131 global_indexes=None, connection=None): | |
| 132 """ | |
| 133 Creates a new table in DynamoDB & returns an in-memory ``Table`` object. | |
| 134 | |
| 135 This will setup a brand new table within DynamoDB. The ``table_name`` | |
| 136 must be unique for your AWS account. The ``schema`` is also required | |
| 137 to define the key structure of the table. | |
| 138 | |
| 139 **IMPORTANT** - You should consider the usage pattern of your table | |
| 140 up-front, as the schema can **NOT** be modified once the table is | |
| 141 created, requiring the creation of a new table & migrating the data | |
| 142 should you wish to revise it. | |
| 143 | |
| 144 **IMPORTANT** - If the table already exists in DynamoDB, additional | |
| 145 calls to this method will result in an error. If you just need | |
| 146 a ``Table`` object to interact with the existing table, you should | |
| 147 just initialize a new ``Table`` object, which requires only the | |
| 148 ``table_name``. | |
| 149 | |
| 150 Requires a ``table_name`` parameter, which should be a simple string | |
| 151 of the name of the table. | |
| 152 | |
| 153 Requires a ``schema`` parameter, which should be a list of | |
| 154 ``BaseSchemaField`` subclasses representing the desired schema. | |
| 155 | |
| 156 Optionally accepts a ``throughput`` parameter, which should be a | |
| 157 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
| 158 both of which should have an integer value associated with them. | |
| 159 | |
| 160 Optionally accepts a ``indexes`` parameter, which should be a list of | |
| 161 ``BaseIndexField`` subclasses representing the desired indexes. | |
| 162 | |
| 163 Optionally accepts a ``global_indexes`` parameter, which should be a | |
| 164 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
| 165 indexes. | |
| 166 | |
| 167 Optionally accepts a ``connection`` parameter, which should be a | |
| 168 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
| 169 for specifying alternate connection parameters. | |
| 170 | |
| 171 Example:: | |
| 172 | |
| 173 >>> users = Table.create('users', schema=[ | |
| 174 ... HashKey('username'), | |
| 175 ... RangeKey('date_joined', data_type=NUMBER) | |
| 176 ... ], throughput={ | |
| 177 ... 'read':20, | |
| 178 ... 'write': 10, | |
| 179 ... }, indexes=[ | |
| 180 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
| 181 ... RangeKey('date_joined') | |
| 182 ... ]), global_indexes=[ | |
| 183 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
| 184 ... HashKey('zipcode'), | |
| 185 ... RangeKey('username'), | |
| 186 ... ], | |
| 187 ... throughput={ | |
| 188 ... 'read':10, | |
| 189 ... 'write':10, | |
| 190 ... }), | |
| 191 ... ]) | |
| 192 | |
| 193 """ | |
| 194 table = cls(table_name=table_name, connection=connection) | |
| 195 table.schema = schema | |
| 196 | |
| 197 if throughput is not None: | |
| 198 table.throughput = throughput | |
| 199 | |
| 200 if indexes is not None: | |
| 201 table.indexes = indexes | |
| 202 | |
| 203 if global_indexes is not None: | |
| 204 table.global_indexes = global_indexes | |
| 205 | |
| 206 # Prep the schema. | |
| 207 raw_schema = [] | |
| 208 attr_defs = [] | |
| 209 seen_attrs = set() | |
| 210 | |
| 211 for field in table.schema: | |
| 212 raw_schema.append(field.schema()) | |
| 213 # Build the attributes off what we know. | |
| 214 seen_attrs.add(field.name) | |
| 215 attr_defs.append(field.definition()) | |
| 216 | |
| 217 raw_throughput = { | |
| 218 'ReadCapacityUnits': int(table.throughput['read']), | |
| 219 'WriteCapacityUnits': int(table.throughput['write']), | |
| 220 } | |
| 221 kwargs = {} | |
| 222 | |
| 223 kwarg_map = { | |
| 224 'indexes': 'local_secondary_indexes', | |
| 225 'global_indexes': 'global_secondary_indexes', | |
| 226 } | |
| 227 for index_attr in ('indexes', 'global_indexes'): | |
| 228 table_indexes = getattr(table, index_attr) | |
| 229 if table_indexes: | |
| 230 raw_indexes = [] | |
| 231 for index_field in table_indexes: | |
| 232 raw_indexes.append(index_field.schema()) | |
| 233 # Make sure all attributes specified in the indexes are | |
| 234 # added to the definition | |
| 235 for field in index_field.parts: | |
| 236 if field.name not in seen_attrs: | |
| 237 seen_attrs.add(field.name) | |
| 238 attr_defs.append(field.definition()) | |
| 239 | |
| 240 kwargs[kwarg_map[index_attr]] = raw_indexes | |
| 241 | |
| 242 table.connection.create_table( | |
| 243 table_name=table.table_name, | |
| 244 attribute_definitions=attr_defs, | |
| 245 key_schema=raw_schema, | |
| 246 provisioned_throughput=raw_throughput, | |
| 247 **kwargs | |
| 248 ) | |
| 249 return table | |
| 250 | |
| 251 def _introspect_schema(self, raw_schema, raw_attributes=None): | |
| 252 """ | |
| 253 Given a raw schema structure back from a DynamoDB response, parse | |
| 254 out & build the high-level Python objects that represent them. | |
| 255 """ | |
| 256 schema = [] | |
| 257 sane_attributes = {} | |
| 258 | |
| 259 if raw_attributes: | |
| 260 for field in raw_attributes: | |
| 261 sane_attributes[field['AttributeName']] = field['AttributeType'] | |
| 262 | |
| 263 for field in raw_schema: | |
| 264 data_type = sane_attributes.get(field['AttributeName'], STRING) | |
| 265 | |
| 266 if field['KeyType'] == 'HASH': | |
| 267 schema.append( | |
| 268 HashKey(field['AttributeName'], data_type=data_type) | |
| 269 ) | |
| 270 elif field['KeyType'] == 'RANGE': | |
| 271 schema.append( | |
| 272 RangeKey(field['AttributeName'], data_type=data_type) | |
| 273 ) | |
| 274 else: | |
| 275 raise exceptions.UnknownSchemaFieldError( | |
| 276 "%s was seen, but is unknown. Please report this at " | |
| 277 "https://github.com/boto/boto/issues." % field['KeyType'] | |
| 278 ) | |
| 279 | |
| 280 return schema | |
| 281 | |
| 282 def _introspect_all_indexes(self, raw_indexes, map_indexes_projection): | |
| 283 """ | |
| 284 Given a raw index/global index structure back from a DynamoDB response, | |
| 285 parse out & build the high-level Python objects that represent them. | |
| 286 """ | |
| 287 indexes = [] | |
| 288 | |
| 289 for field in raw_indexes: | |
| 290 index_klass = map_indexes_projection.get('ALL') | |
| 291 kwargs = { | |
| 292 'parts': [] | |
| 293 } | |
| 294 | |
| 295 if field['Projection']['ProjectionType'] == 'ALL': | |
| 296 index_klass = map_indexes_projection.get('ALL') | |
| 297 elif field['Projection']['ProjectionType'] == 'KEYS_ONLY': | |
| 298 index_klass = map_indexes_projection.get('KEYS_ONLY') | |
| 299 elif field['Projection']['ProjectionType'] == 'INCLUDE': | |
| 300 index_klass = map_indexes_projection.get('INCLUDE') | |
| 301 kwargs['includes'] = field['Projection']['NonKeyAttributes'] | |
| 302 else: | |
| 303 raise exceptions.UnknownIndexFieldError( | |
| 304 "%s was seen, but is unknown. Please report this at " | |
| 305 "https://github.com/boto/boto/issues." % \ | |
| 306 field['Projection']['ProjectionType'] | |
| 307 ) | |
| 308 | |
| 309 name = field['IndexName'] | |
| 310 kwargs['parts'] = self._introspect_schema(field['KeySchema'], None) | |
| 311 indexes.append(index_klass(name, **kwargs)) | |
| 312 | |
| 313 return indexes | |
| 314 | |
| 315 def _introspect_indexes(self, raw_indexes): | |
| 316 """ | |
| 317 Given a raw index structure back from a DynamoDB response, parse | |
| 318 out & build the high-level Python objects that represent them. | |
| 319 """ | |
| 320 return self._introspect_all_indexes( | |
| 321 raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes')) | |
| 322 | |
| 323 def _introspect_global_indexes(self, raw_global_indexes): | |
| 324 """ | |
| 325 Given a raw global index structure back from a DynamoDB response, parse | |
| 326 out & build the high-level Python objects that represent them. | |
| 327 """ | |
| 328 return self._introspect_all_indexes( | |
| 329 raw_global_indexes, | |
| 330 self._PROJECTION_TYPE_TO_INDEX.get('global_indexes')) | |
| 331 | |
| 332 def describe(self): | |
| 333 """ | |
| 334 Describes the current structure of the table in DynamoDB. | |
| 335 | |
| 336 This information will be used to update the ``schema``, ``indexes``, | |
| 337 ``global_indexes`` and ``throughput`` information on the ``Table``. Some | |
| 338 calls, such as those involving creating keys or querying, will require | |
| 339 this information to be populated. | |
| 340 | |
| 341 It also returns the full raw data structure from DynamoDB, in the | |
| 342 event you'd like to parse out additional information (such as the | |
| 343 ``ItemCount`` or usage information). | |
| 344 | |
| 345 Example:: | |
| 346 | |
| 347 >>> users.describe() | |
| 348 { | |
| 349 # Lots of keys here... | |
| 350 } | |
| 351 >>> len(users.schema) | |
| 352 2 | |
| 353 | |
| 354 """ | |
| 355 result = self.connection.describe_table(self.table_name) | |
| 356 | |
| 357 # Blindly update throughput, since what's on DynamoDB's end is likely | |
| 358 # more correct. | |
| 359 raw_throughput = result['Table']['ProvisionedThroughput'] | |
| 360 self.throughput['read'] = int(raw_throughput['ReadCapacityUnits']) | |
| 361 self.throughput['write'] = int(raw_throughput['WriteCapacityUnits']) | |
| 362 | |
| 363 if not self.schema: | |
| 364 # Since we have the data, build the schema. | |
| 365 raw_schema = result['Table'].get('KeySchema', []) | |
| 366 raw_attributes = result['Table'].get('AttributeDefinitions', []) | |
| 367 self.schema = self._introspect_schema(raw_schema, raw_attributes) | |
| 368 | |
| 369 if not self.indexes: | |
| 370 # Build the index information as well. | |
| 371 raw_indexes = result['Table'].get('LocalSecondaryIndexes', []) | |
| 372 self.indexes = self._introspect_indexes(raw_indexes) | |
| 373 | |
| 374 # Build the global index information as well. | |
| 375 raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', []) | |
| 376 self.global_indexes = self._introspect_global_indexes(raw_global_indexes) | |
| 377 | |
| 378 # This is leaky. | |
| 379 return result | |
| 380 | |
| 381 def update(self, throughput=None, global_indexes=None): | |
| 382 """ | |
| 383 Updates table attributes and global indexes in DynamoDB. | |
| 384 | |
| 385 Optionally accepts a ``throughput`` parameter, which should be a | |
| 386 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
| 387 both of which should have an integer value associated with them. | |
| 388 | |
| 389 Optionally accepts a ``global_indexes`` parameter, which should be a | |
| 390 dictionary. If provided, it should specify the index name, which is also | |
| 391 a dict containing a ``read`` & ``write`` key, both of which | |
| 392 should have an integer value associated with them. If you are writing | |
| 393 new code, please use ``Table.update_global_secondary_index``. | |
| 394 | |
| 395 Returns ``True`` on success. | |
| 396 | |
| 397 Example:: | |
| 398 | |
| 399 # For a read-heavier application... | |
| 400 >>> users.update(throughput={ | |
| 401 ... 'read': 20, | |
| 402 ... 'write': 10, | |
| 403 ... }) | |
| 404 True | |
| 405 | |
| 406 # To also update the global index(es) throughput. | |
| 407 >>> users.update(throughput={ | |
| 408 ... 'read': 20, | |
| 409 ... 'write': 10, | |
| 410 ... }, | |
| 411 ... global_secondary_indexes={ | |
| 412 ... 'TheIndexNameHere': { | |
| 413 ... 'read': 15, | |
| 414 ... 'write': 5, | |
| 415 ... } | |
| 416 ... }) | |
| 417 True | |
| 418 """ | |
| 419 | |
| 420 data = None | |
| 421 | |
| 422 if throughput: | |
| 423 self.throughput = throughput | |
| 424 data = { | |
| 425 'ReadCapacityUnits': int(self.throughput['read']), | |
| 426 'WriteCapacityUnits': int(self.throughput['write']), | |
| 427 } | |
| 428 | |
| 429 gsi_data = None | |
| 430 | |
| 431 if global_indexes: | |
| 432 gsi_data = [] | |
| 433 | |
| 434 for gsi_name, gsi_throughput in global_indexes.items(): | |
| 435 gsi_data.append({ | |
| 436 "Update": { | |
| 437 "IndexName": gsi_name, | |
| 438 "ProvisionedThroughput": { | |
| 439 "ReadCapacityUnits": int(gsi_throughput['read']), | |
| 440 "WriteCapacityUnits": int(gsi_throughput['write']), | |
| 441 }, | |
| 442 }, | |
| 443 }) | |
| 444 | |
| 445 if throughput or global_indexes: | |
| 446 self.connection.update_table( | |
| 447 self.table_name, | |
| 448 provisioned_throughput=data, | |
| 449 global_secondary_index_updates=gsi_data, | |
| 450 ) | |
| 451 | |
| 452 return True | |
| 453 else: | |
| 454 msg = 'You need to provide either the throughput or the ' \ | |
| 455 'global_indexes to update method' | |
| 456 boto.log.error(msg) | |
| 457 | |
| 458 return False | |
| 459 | |
| 460 def create_global_secondary_index(self, global_index): | |
| 461 """ | |
| 462 Creates a global index in DynamoDB after the table has been created. | |
| 463 | |
| 464 Requires a ``global_indexes`` parameter, which should be a | |
| 465 ``GlobalBaseIndexField`` subclass representing the desired index. | |
| 466 | |
| 467 To update ``global_indexes`` information on the ``Table``, you'll need | |
| 468 to call ``Table.describe``. | |
| 469 | |
| 470 Returns ``True`` on success. | |
| 471 | |
| 472 Example:: | |
| 473 | |
| 474 # To create a global index | |
| 475 >>> users.create_global_secondary_index( | |
| 476 ... global_index=GlobalAllIndex( | |
| 477 ... 'TheIndexNameHere', parts=[ | |
| 478 ... HashKey('requiredHashkey', data_type=STRING), | |
| 479 ... RangeKey('optionalRangeKey', data_type=STRING) | |
| 480 ... ], | |
| 481 ... throughput={ | |
| 482 ... 'read': 2, | |
| 483 ... 'write': 1, | |
| 484 ... }) | |
| 485 ... ) | |
| 486 True | |
| 487 | |
| 488 """ | |
| 489 | |
| 490 if global_index: | |
| 491 gsi_data = [] | |
| 492 gsi_data_attr_def = [] | |
| 493 | |
| 494 gsi_data.append({ | |
| 495 "Create": global_index.schema() | |
| 496 }) | |
| 497 | |
| 498 for attr_def in global_index.parts: | |
| 499 gsi_data_attr_def.append(attr_def.definition()) | |
| 500 | |
| 501 self.connection.update_table( | |
| 502 self.table_name, | |
| 503 global_secondary_index_updates=gsi_data, | |
| 504 attribute_definitions=gsi_data_attr_def | |
| 505 ) | |
| 506 | |
| 507 return True | |
| 508 else: | |
| 509 msg = 'You need to provide the global_index to ' \ | |
| 510 'create_global_secondary_index method' | |
| 511 boto.log.error(msg) | |
| 512 | |
| 513 return False | |
| 514 | |
| 515 def delete_global_secondary_index(self, global_index_name): | |
| 516 """ | |
| 517 Deletes a global index in DynamoDB after the table has been created. | |
| 518 | |
| 519 Requires a ``global_index_name`` parameter, which should be a simple | |
| 520 string of the name of the global secondary index. | |
| 521 | |
| 522 To update ``global_indexes`` information on the ``Table``, you'll need | |
| 523 to call ``Table.describe``. | |
| 524 | |
| 525 Returns ``True`` on success. | |
| 526 | |
| 527 Example:: | |
| 528 | |
| 529 # To delete a global index | |
| 530 >>> users.delete_global_secondary_index('TheIndexNameHere') | |
| 531 True | |
| 532 | |
| 533 """ | |
| 534 | |
| 535 if global_index_name: | |
| 536 gsi_data = [ | |
| 537 { | |
| 538 "Delete": { | |
| 539 "IndexName": global_index_name | |
| 540 } | |
| 541 } | |
| 542 ] | |
| 543 | |
| 544 self.connection.update_table( | |
| 545 self.table_name, | |
| 546 global_secondary_index_updates=gsi_data, | |
| 547 ) | |
| 548 | |
| 549 return True | |
| 550 else: | |
| 551 msg = 'You need to provide the global index name to ' \ | |
| 552 'delete_global_secondary_index method' | |
| 553 boto.log.error(msg) | |
| 554 | |
| 555 return False | |
| 556 | |
| 557 def update_global_secondary_index(self, global_indexes): | |
| 558 """ | |
| 559 Updates a global index(es) in DynamoDB after the table has been created. | |
| 560 | |
| 561 Requires a ``global_indexes`` parameter, which should be a | |
| 562 dictionary. If provided, it should specify the index name, which is also | |
| 563 a dict containing a ``read`` & ``write`` key, both of which | |
| 564 should have an integer value associated with them. | |
| 565 | |
| 566 To update ``global_indexes`` information on the ``Table``, you'll need | |
| 567 to call ``Table.describe``. | |
| 568 | |
| 569 Returns ``True`` on success. | |
| 570 | |
| 571 Example:: | |
| 572 | |
| 573 # To update a global index | |
| 574 >>> users.update_global_secondary_index(global_indexes={ | |
| 575 ... 'TheIndexNameHere': { | |
| 576 ... 'read': 15, | |
| 577 ... 'write': 5, | |
| 578 ... } | |
| 579 ... }) | |
| 580 True | |
| 581 | |
| 582 """ | |
| 583 | |
| 584 if global_indexes: | |
| 585 gsi_data = [] | |
| 586 | |
| 587 for gsi_name, gsi_throughput in global_indexes.items(): | |
| 588 gsi_data.append({ | |
| 589 "Update": { | |
| 590 "IndexName": gsi_name, | |
| 591 "ProvisionedThroughput": { | |
| 592 "ReadCapacityUnits": int(gsi_throughput['read']), | |
| 593 "WriteCapacityUnits": int(gsi_throughput['write']), | |
| 594 }, | |
| 595 }, | |
| 596 }) | |
| 597 | |
| 598 self.connection.update_table( | |
| 599 self.table_name, | |
| 600 global_secondary_index_updates=gsi_data, | |
| 601 ) | |
| 602 return True | |
| 603 else: | |
| 604 msg = 'You need to provide the global indexes to ' \ | |
| 605 'update_global_secondary_index method' | |
| 606 boto.log.error(msg) | |
| 607 | |
| 608 return False | |
| 609 | |
| 610 def delete(self): | |
| 611 """ | |
| 612 Deletes a table in DynamoDB. | |
| 613 | |
| 614 **IMPORTANT** - Be careful when using this method, there is no undo. | |
| 615 | |
| 616 Returns ``True`` on success. | |
| 617 | |
| 618 Example:: | |
| 619 | |
| 620 >>> users.delete() | |
| 621 True | |
| 622 | |
| 623 """ | |
| 624 self.connection.delete_table(self.table_name) | |
| 625 return True | |
| 626 | |
| 627 def _encode_keys(self, keys): | |
| 628 """ | |
| 629 Given a flat Python dictionary of keys/values, converts it into the | |
| 630 nested dictionary DynamoDB expects. | |
| 631 | |
| 632 Converts:: | |
| 633 | |
| 634 { | |
| 635 'username': 'john', | |
| 636 'tags': [1, 2, 5], | |
| 637 } | |
| 638 | |
| 639 ...to...:: | |
| 640 | |
| 641 { | |
| 642 'username': {'S': 'john'}, | |
| 643 'tags': {'NS': ['1', '2', '5']}, | |
| 644 } | |
| 645 | |
| 646 """ | |
| 647 raw_key = {} | |
| 648 | |
| 649 for key, value in keys.items(): | |
| 650 raw_key[key] = self._dynamizer.encode(value) | |
| 651 | |
| 652 return raw_key | |
| 653 | |
| 654 def get_item(self, consistent=False, attributes=None, **kwargs): | |
| 655 """ | |
| 656 Fetches an item (record) from a table in DynamoDB. | |
| 657 | |
| 658 To specify the key of the item you'd like to get, you can specify the | |
| 659 key attributes as kwargs. | |
| 660 | |
| 661 Optionally accepts a ``consistent`` parameter, which should be a | |
| 662 boolean. If you provide ``True``, it will perform | |
| 663 a consistent (but more expensive) read from DynamoDB. | |
| 664 (Default: ``False``) | |
| 665 | |
| 666 Optionally accepts an ``attributes`` parameter, which should be a | |
| 667 list of fieldname to fetch. (Default: ``None``, which means all fields | |
| 668 should be fetched) | |
| 669 | |
| 670 Returns an ``Item`` instance containing all the data for that record. | |
| 671 | |
| 672 Raises an ``ItemNotFound`` exception if the item is not found. | |
| 673 | |
| 674 Example:: | |
| 675 | |
| 676 # A simple hash key. | |
| 677 >>> john = users.get_item(username='johndoe') | |
| 678 >>> john['first_name'] | |
| 679 'John' | |
| 680 | |
| 681 # A complex hash+range key. | |
| 682 >>> john = users.get_item(username='johndoe', last_name='Doe') | |
| 683 >>> john['first_name'] | |
| 684 'John' | |
| 685 | |
| 686 # A consistent read (assuming the data might have just changed). | |
| 687 >>> john = users.get_item(username='johndoe', consistent=True) | |
| 688 >>> john['first_name'] | |
| 689 'Johann' | |
| 690 | |
| 691 # With a key that is an invalid variable name in Python. | |
| 692 # Also, assumes a different schema than previous examples. | |
| 693 >>> john = users.get_item(**{ | |
| 694 ... 'date-joined': 127549192, | |
| 695 ... }) | |
| 696 >>> john['first_name'] | |
| 697 'John' | |
| 698 | |
| 699 """ | |
| 700 raw_key = self._encode_keys(kwargs) | |
| 701 item_data = self.connection.get_item( | |
| 702 self.table_name, | |
| 703 raw_key, | |
| 704 attributes_to_get=attributes, | |
| 705 consistent_read=consistent | |
| 706 ) | |
| 707 if 'Item' not in item_data: | |
| 708 raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs) | |
| 709 item = Item(self) | |
| 710 item.load(item_data) | |
| 711 return item | |
| 712 | |
| 713 def has_item(self, **kwargs): | |
| 714 """ | |
| 715 Return whether an item (record) exists within a table in DynamoDB. | |
| 716 | |
| 717 To specify the key of the item you'd like to get, you can specify the | |
| 718 key attributes as kwargs. | |
| 719 | |
| 720 Optionally accepts a ``consistent`` parameter, which should be a | |
| 721 boolean. If you provide ``True``, it will perform | |
| 722 a consistent (but more expensive) read from DynamoDB. | |
| 723 (Default: ``False``) | |
| 724 | |
| 725 Optionally accepts an ``attributes`` parameter, which should be a | |
| 726 list of fieldnames to fetch. (Default: ``None``, which means all fields | |
| 727 should be fetched) | |
| 728 | |
| 729 Returns ``True`` if an ``Item`` is present, ``False`` if not. | |
| 730 | |
| 731 Example:: | |
| 732 | |
| 733 # Simple, just hash-key schema. | |
| 734 >>> users.has_item(username='johndoe') | |
| 735 True | |
| 736 | |
| 737 # Complex schema, item not present. | |
| 738 >>> users.has_item( | |
| 739 ... username='johndoe', | |
| 740 ... date_joined='2014-01-07' | |
| 741 ... ) | |
| 742 False | |
| 743 | |
| 744 """ | |
| 745 try: | |
| 746 self.get_item(**kwargs) | |
| 747 except (JSONResponseError, exceptions.ItemNotFound): | |
| 748 return False | |
| 749 | |
| 750 return True | |
| 751 | |
| 752 def lookup(self, *args, **kwargs): | |
| 753 """ | |
| 754 Look up an entry in DynamoDB. This is mostly backwards compatible | |
| 755 with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first, | |
| 756 although you may still specify keyword arguments instead. | |
| 757 | |
| 758 Also unlike the get_item command, if the returned item has no keys | |
| 759 (i.e., it does not exist in DynamoDB), a None result is returned, instead | |
| 760 of an empty key object. | |
| 761 | |
| 762 Example:: | |
| 763 >>> user = users.lookup(username) | |
| 764 >>> user = users.lookup(username, consistent=True) | |
| 765 >>> app = apps.lookup('my_customer_id', 'my_app_id') | |
| 766 | |
| 767 """ | |
| 768 if not self.schema: | |
| 769 self.describe() | |
| 770 for x, arg in enumerate(args): | |
| 771 kwargs[self.schema[x].name] = arg | |
| 772 ret = self.get_item(**kwargs) | |
| 773 if not ret.keys(): | |
| 774 return None | |
| 775 return ret | |
| 776 | |
| 777 def new_item(self, *args): | |
| 778 """ | |
| 779 Returns a new, blank item | |
| 780 | |
| 781 This is mostly for consistency with boto.dynamodb | |
| 782 """ | |
| 783 if not self.schema: | |
| 784 self.describe() | |
| 785 data = {} | |
| 786 for x, arg in enumerate(args): | |
| 787 data[self.schema[x].name] = arg | |
| 788 return Item(self, data=data) | |
| 789 | |
| 790 def put_item(self, data, overwrite=False): | |
| 791 """ | |
| 792 Saves an entire item to DynamoDB. | |
| 793 | |
| 794 By default, if any part of the ``Item``'s original data doesn't match | |
| 795 what's currently in DynamoDB, this request will fail. This prevents | |
| 796 other processes from updating the data in between when you read the | |
| 797 item & when your request to update the item's data is processed, which | |
| 798 would typically result in some data loss. | |
| 799 | |
| 800 Requires a ``data`` parameter, which should be a dictionary of the data | |
| 801 you'd like to store in DynamoDB. | |
| 802 | |
| 803 Optionally accepts an ``overwrite`` parameter, which should be a | |
| 804 boolean. If you provide ``True``, this will tell DynamoDB to blindly | |
| 805 overwrite whatever data is present, if any. | |
| 806 | |
| 807 Returns ``True`` on success. | |
| 808 | |
| 809 Example:: | |
| 810 | |
| 811 >>> users.put_item(data={ | |
| 812 ... 'username': 'jane', | |
| 813 ... 'first_name': 'Jane', | |
| 814 ... 'last_name': 'Doe', | |
| 815 ... 'date_joined': 126478915, | |
| 816 ... }) | |
| 817 True | |
| 818 | |
| 819 """ | |
| 820 item = Item(self, data=data) | |
| 821 return item.save(overwrite=overwrite) | |
| 822 | |
| 823 def _put_item(self, item_data, expects=None): | |
| 824 """ | |
| 825 The internal variant of ``put_item`` (full data). This is used by the | |
| 826 ``Item`` objects, since that operation is represented at the | |
| 827 table-level by the API, but conceptually maps better to telling an | |
| 828 individual ``Item`` to save itself. | |
| 829 """ | |
| 830 kwargs = {} | |
| 831 | |
| 832 if expects is not None: | |
| 833 kwargs['expected'] = expects | |
| 834 | |
| 835 self.connection.put_item(self.table_name, item_data, **kwargs) | |
| 836 return True | |
| 837 | |
| 838 def _update_item(self, key, item_data, expects=None): | |
| 839 """ | |
| 840 The internal variant of ``put_item`` (partial data). This is used by the | |
| 841 ``Item`` objects, since that operation is represented at the | |
| 842 table-level by the API, but conceptually maps better to telling an | |
| 843 individual ``Item`` to save itself. | |
| 844 """ | |
| 845 raw_key = self._encode_keys(key) | |
| 846 kwargs = {} | |
| 847 | |
| 848 if expects is not None: | |
| 849 kwargs['expected'] = expects | |
| 850 | |
| 851 self.connection.update_item(self.table_name, raw_key, item_data, **kwargs) | |
| 852 return True | |
| 853 | |
| 854 def delete_item(self, expected=None, conditional_operator=None, **kwargs): | |
| 855 """ | |
| 856 Deletes a single item. You can perform a conditional delete operation | |
| 857 that deletes the item if it exists, or if it has an expected attribute | |
| 858 value. | |
| 859 | |
| 860 Conditional deletes are useful for only deleting items if specific | |
| 861 conditions are met. If those conditions are met, DynamoDB performs | |
| 862 the delete. Otherwise, the item is not deleted. | |
| 863 | |
| 864 To specify the expected attribute values of the item, you can pass a | |
| 865 dictionary of conditions to ``expected``. Each condition should follow | |
| 866 the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``. | |
| 867 | |
| 868 **IMPORTANT** - Be careful when using this method, there is no undo. | |
| 869 | |
| 870 To specify the key of the item you'd like to get, you can specify the | |
| 871 key attributes as kwargs. | |
| 872 | |
| 873 Optionally accepts an ``expected`` parameter which is a dictionary of | |
| 874 expected attribute value conditions. | |
| 875 | |
| 876 Optionally accepts a ``conditional_operator`` which applies to the | |
| 877 expected attribute value conditions: | |
| 878 | |
| 879 + `AND` - If all of the conditions evaluate to true (default) | |
| 880 + `OR` - True if at least one condition evaluates to true | |
| 881 | |
| 882 Returns ``True`` on success, ``False`` on failed conditional delete. | |
| 883 | |
| 884 Example:: | |
| 885 | |
| 886 # A simple hash key. | |
| 887 >>> users.delete_item(username='johndoe') | |
| 888 True | |
| 889 | |
| 890 # A complex hash+range key. | |
| 891 >>> users.delete_item(username='jane', last_name='Doe') | |
| 892 True | |
| 893 | |
| 894 # With a key that is an invalid variable name in Python. | |
| 895 # Also, assumes a different schema than previous examples. | |
| 896 >>> users.delete_item(**{ | |
| 897 ... 'date-joined': 127549192, | |
| 898 ... }) | |
| 899 True | |
| 900 | |
| 901 # Conditional delete | |
| 902 >>> users.delete_item(username='johndoe', | |
| 903 ... expected={'balance__eq': 0}) | |
| 904 True | |
| 905 """ | |
| 906 expected = self._build_filters(expected, using=FILTER_OPERATORS) | |
| 907 raw_key = self._encode_keys(kwargs) | |
| 908 | |
| 909 try: | |
| 910 self.connection.delete_item(self.table_name, raw_key, | |
| 911 expected=expected, | |
| 912 conditional_operator=conditional_operator) | |
| 913 except exceptions.ConditionalCheckFailedException: | |
| 914 return False | |
| 915 | |
| 916 return True | |
| 917 | |
| 918 def get_key_fields(self): | |
| 919 """ | |
| 920 Returns the fields necessary to make a key for a table. | |
| 921 | |
| 922 If the ``Table`` does not already have a populated ``schema``, | |
| 923 this will request it via a ``Table.describe`` call. | |
| 924 | |
| 925 Returns a list of fieldnames (strings). | |
| 926 | |
| 927 Example:: | |
| 928 | |
| 929 # A simple hash key. | |
| 930 >>> users.get_key_fields() | |
| 931 ['username'] | |
| 932 | |
| 933 # A complex hash+range key. | |
| 934 >>> users.get_key_fields() | |
| 935 ['username', 'last_name'] | |
| 936 | |
| 937 """ | |
| 938 if not self.schema: | |
| 939 # We don't know the structure of the table. Get a description to | |
| 940 # populate the schema. | |
| 941 self.describe() | |
| 942 | |
| 943 return [field.name for field in self.schema] | |
| 944 | |
| 945 def batch_write(self): | |
| 946 """ | |
| 947 Allows the batching of writes to DynamoDB. | |
| 948 | |
| 949 Since each write/delete call to DynamoDB has a cost associated with it, | |
| 950 when loading lots of data, it makes sense to batch them, creating as | |
| 951 few calls as possible. | |
| 952 | |
| 953 This returns a context manager that will transparently handle creating | |
| 954 these batches. The object you get back lightly-resembles a ``Table`` | |
| 955 object, sharing just the ``put_item`` & ``delete_item`` methods | |
| 956 (which are all that DynamoDB can batch in terms of writing data). | |
| 957 | |
| 958 DynamoDB's maximum batch size is 25 items per request. If you attempt | |
| 959 to put/delete more than that, the context manager will batch as many | |
| 960 as it can up to that number, then flush them to DynamoDB & continue | |
| 961 batching as more calls come in. | |
| 962 | |
| 963 Example:: | |
| 964 | |
| 965 # Assuming a table with one record... | |
| 966 >>> with users.batch_write() as batch: | |
| 967 ... batch.put_item(data={ | |
| 968 ... 'username': 'johndoe', | |
| 969 ... 'first_name': 'John', | |
| 970 ... 'last_name': 'Doe', | |
| 971 ... 'owner': 1, | |
| 972 ... }) | |
| 973 ... # Nothing across the wire yet. | |
| 974 ... batch.delete_item(username='bob') | |
| 975 ... # Still no requests sent. | |
| 976 ... batch.put_item(data={ | |
| 977 ... 'username': 'jane', | |
| 978 ... 'first_name': 'Jane', | |
| 979 ... 'last_name': 'Doe', | |
| 980 ... 'date_joined': 127436192, | |
| 981 ... }) | |
| 982 ... # Nothing yet, but once we leave the context, the | |
| 983 ... # put/deletes will be sent. | |
| 984 | |
| 985 """ | |
| 986 # PHENOMENAL COSMIC DOCS!!! itty-bitty code. | |
| 987 return BatchTable(self) | |
| 988 | |
| 989 def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS): | |
| 990 """ | |
| 991 An internal method for taking query/scan-style ``**kwargs`` & turning | |
| 992 them into the raw structure DynamoDB expects for filtering. | |
| 993 """ | |
| 994 if filter_kwargs is None: | |
| 995 return | |
| 996 | |
| 997 filters = {} | |
| 998 | |
| 999 for field_and_op, value in filter_kwargs.items(): | |
| 1000 field_bits = field_and_op.split('__') | |
| 1001 fieldname = '__'.join(field_bits[:-1]) | |
| 1002 | |
| 1003 try: | |
| 1004 op = using[field_bits[-1]] | |
| 1005 except KeyError: | |
| 1006 raise exceptions.UnknownFilterTypeError( | |
| 1007 "Operator '%s' from '%s' is not recognized." % ( | |
| 1008 field_bits[-1], | |
| 1009 field_and_op | |
| 1010 ) | |
| 1011 ) | |
| 1012 | |
| 1013 lookup = { | |
| 1014 'AttributeValueList': [], | |
| 1015 'ComparisonOperator': op, | |
| 1016 } | |
| 1017 | |
| 1018 # Special-case the ``NULL/NOT_NULL`` case. | |
| 1019 if field_bits[-1] == 'null': | |
| 1020 del lookup['AttributeValueList'] | |
| 1021 | |
| 1022 if value is False: | |
| 1023 lookup['ComparisonOperator'] = 'NOT_NULL' | |
| 1024 else: | |
| 1025 lookup['ComparisonOperator'] = 'NULL' | |
| 1026 # Special-case the ``BETWEEN`` case. | |
| 1027 elif field_bits[-1] == 'between': | |
| 1028 if len(value) == 2 and isinstance(value, (list, tuple)): | |
| 1029 lookup['AttributeValueList'].append( | |
| 1030 self._dynamizer.encode(value[0]) | |
| 1031 ) | |
| 1032 lookup['AttributeValueList'].append( | |
| 1033 self._dynamizer.encode(value[1]) | |
| 1034 ) | |
| 1035 # Special-case the ``IN`` case | |
| 1036 elif field_bits[-1] == 'in': | |
| 1037 for val in value: | |
| 1038 lookup['AttributeValueList'].append(self._dynamizer.encode(val)) | |
| 1039 else: | |
| 1040 # Fix up the value for encoding, because it was built to only work | |
| 1041 # with ``set``s. | |
| 1042 if isinstance(value, (list, tuple)): | |
| 1043 value = set(value) | |
| 1044 lookup['AttributeValueList'].append( | |
| 1045 self._dynamizer.encode(value) | |
| 1046 ) | |
| 1047 | |
| 1048 # Finally, insert it into the filters. | |
| 1049 filters[fieldname] = lookup | |
| 1050 | |
| 1051 return filters | |
| 1052 | |
| 1053 def query(self, limit=None, index=None, reverse=False, consistent=False, | |
| 1054 attributes=None, max_page_size=None, **filter_kwargs): | |
| 1055 """ | |
| 1056 **WARNING:** This method is provided **strictly** for | |
| 1057 backward-compatibility. It returns results in an incorrect order. | |
| 1058 | |
| 1059 If you are writing new code, please use ``Table.query_2``. | |
| 1060 """ | |
| 1061 reverse = not reverse | |
| 1062 return self.query_2(limit=limit, index=index, reverse=reverse, | |
| 1063 consistent=consistent, attributes=attributes, | |
| 1064 max_page_size=max_page_size, **filter_kwargs) | |
| 1065 | |
| 1066 def query_2(self, limit=None, index=None, reverse=False, | |
| 1067 consistent=False, attributes=None, max_page_size=None, | |
| 1068 query_filter=None, conditional_operator=None, | |
| 1069 **filter_kwargs): | |
| 1070 """ | |
| 1071 Queries for a set of matching items in a DynamoDB table. | |
| 1072 | |
| 1073 Queries can be performed against a hash key, a hash+range key or | |
| 1074 against any data stored in your local secondary indexes. Query filters | |
| 1075 can be used to filter on arbitrary fields. | |
| 1076 | |
| 1077 **Note** - You can not query against arbitrary fields within the data | |
| 1078 stored in DynamoDB unless you specify ``query_filter`` values. | |
| 1079 | |
| 1080 To specify the filters of the items you'd like to get, you can specify | |
| 1081 the filters as kwargs. Each filter kwarg should follow the pattern | |
| 1082 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
| 1083 are specified in the same way. | |
| 1084 | |
| 1085 Optionally accepts a ``limit`` parameter, which should be an integer | |
| 1086 count of the total number of items to return. (Default: ``None`` - | |
| 1087 all results) | |
| 1088 | |
| 1089 Optionally accepts an ``index`` parameter, which should be a string of | |
| 1090 name of the local secondary index you want to query against. | |
| 1091 (Default: ``None``) | |
| 1092 | |
| 1093 Optionally accepts a ``reverse`` parameter, which will present the | |
| 1094 results in reverse order. (Default: ``False`` - normal order) | |
| 1095 | |
| 1096 Optionally accepts a ``consistent`` parameter, which should be a | |
| 1097 boolean. If you provide ``True``, it will force a consistent read of | |
| 1098 the data (more expensive). (Default: ``False`` - use eventually | |
| 1099 consistent reads) | |
| 1100 | |
| 1101 Optionally accepts a ``attributes`` parameter, which should be a | |
| 1102 tuple. If you provide any attributes only these will be fetched | |
| 1103 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
| 1104 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
| 1105 | |
| 1106 Optionally accepts a ``max_page_size`` parameter, which should be an | |
| 1107 integer count of the maximum number of items to retrieve | |
| 1108 **per-request**. This is useful in making faster requests & prevent | |
| 1109 the scan from drowning out other queries. (Default: ``None`` - | |
| 1110 fetch as many as DynamoDB will return) | |
| 1111 | |
| 1112 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
| 1113 conditions against any arbitrary field in the returned data. | |
| 1114 | |
| 1115 Optionally accepts a ``conditional_operator`` which applies to the | |
| 1116 query filter conditions: | |
| 1117 | |
| 1118 + `AND` - True if all filter conditions evaluate to true (default) | |
| 1119 + `OR` - True if at least one filter condition evaluates to true | |
| 1120 | |
| 1121 Returns a ``ResultSet``, which transparently handles the pagination of | |
| 1122 results you get back. | |
| 1123 | |
| 1124 Example:: | |
| 1125 | |
| 1126 # Look for last names equal to "Doe". | |
| 1127 >>> results = users.query(last_name__eq='Doe') | |
| 1128 >>> for res in results: | |
| 1129 ... print res['first_name'] | |
| 1130 'John' | |
| 1131 'Jane' | |
| 1132 | |
| 1133 # Look for last names beginning with "D", in reverse order, limit 3. | |
| 1134 >>> results = users.query( | |
| 1135 ... last_name__beginswith='D', | |
| 1136 ... reverse=True, | |
| 1137 ... limit=3 | |
| 1138 ... ) | |
| 1139 >>> for res in results: | |
| 1140 ... print res['first_name'] | |
| 1141 'Alice' | |
| 1142 'Jane' | |
| 1143 'John' | |
| 1144 | |
| 1145 # Use an LSI & a consistent read. | |
| 1146 >>> results = users.query( | |
| 1147 ... date_joined__gte=1236451000, | |
| 1148 ... owner__eq=1, | |
| 1149 ... index='DateJoinedIndex', | |
| 1150 ... consistent=True | |
| 1151 ... ) | |
| 1152 >>> for res in results: | |
| 1153 ... print res['first_name'] | |
| 1154 'Alice' | |
| 1155 'Bob' | |
| 1156 'John' | |
| 1157 'Fred' | |
| 1158 | |
| 1159 # Filter by non-indexed field(s) | |
| 1160 >>> results = users.query( | |
| 1161 ... last_name__eq='Doe', | |
| 1162 ... reverse=True, | |
| 1163 ... query_filter={ | |
| 1164 ... 'first_name__beginswith': 'A' | |
| 1165 ... } | |
| 1166 ... ) | |
| 1167 >>> for res in results: | |
| 1168 ... print res['first_name'] + ' ' + res['last_name'] | |
| 1169 'Alice Doe' | |
| 1170 | |
| 1171 """ | |
| 1172 if self.schema: | |
| 1173 if len(self.schema) == 1: | |
| 1174 if len(filter_kwargs) <= 1: | |
| 1175 if not self.global_indexes or not len(self.global_indexes): | |
| 1176 # If the schema only has one field, there's <= 1 filter | |
| 1177 # param & no Global Secondary Indexes, this is user | |
| 1178 # error. Bail early. | |
| 1179 raise exceptions.QueryError( | |
| 1180 "You must specify more than one key to filter on." | |
| 1181 ) | |
| 1182 | |
| 1183 if attributes is not None: | |
| 1184 select = 'SPECIFIC_ATTRIBUTES' | |
| 1185 else: | |
| 1186 select = None | |
| 1187 | |
| 1188 results = ResultSet( | |
| 1189 max_page_size=max_page_size | |
| 1190 ) | |
| 1191 kwargs = filter_kwargs.copy() | |
| 1192 kwargs.update({ | |
| 1193 'limit': limit, | |
| 1194 'index': index, | |
| 1195 'reverse': reverse, | |
| 1196 'consistent': consistent, | |
| 1197 'select': select, | |
| 1198 'attributes_to_get': attributes, | |
| 1199 'query_filter': query_filter, | |
| 1200 'conditional_operator': conditional_operator, | |
| 1201 }) | |
| 1202 results.to_call(self._query, **kwargs) | |
| 1203 return results | |
| 1204 | |
| 1205 def query_count(self, index=None, consistent=False, conditional_operator=None, | |
| 1206 query_filter=None, scan_index_forward=True, limit=None, | |
| 1207 exclusive_start_key=None, **filter_kwargs): | |
| 1208 """ | |
| 1209 Queries the exact count of matching items in a DynamoDB table. | |
| 1210 | |
| 1211 Queries can be performed against a hash key, a hash+range key or | |
| 1212 against any data stored in your local secondary indexes. Query filters | |
| 1213 can be used to filter on arbitrary fields. | |
| 1214 | |
| 1215 To specify the filters of the items you'd like to get, you can specify | |
| 1216 the filters as kwargs. Each filter kwarg should follow the pattern | |
| 1217 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
| 1218 are specified in the same way. | |
| 1219 | |
| 1220 Optionally accepts an ``index`` parameter, which should be a string of | |
| 1221 name of the local secondary index you want to query against. | |
| 1222 (Default: ``None``) | |
| 1223 | |
| 1224 Optionally accepts a ``consistent`` parameter, which should be a | |
| 1225 boolean. If you provide ``True``, it will force a consistent read of | |
| 1226 the data (more expensive). (Default: ``False`` - use eventually | |
| 1227 consistent reads) | |
| 1228 | |
| 1229 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
| 1230 conditions against any arbitrary field in the returned data. | |
| 1231 | |
| 1232 Optionally accepts a ``conditional_operator`` which applies to the | |
| 1233 query filter conditions: | |
| 1234 | |
| 1235 + `AND` - True if all filter conditions evaluate to true (default) | |
| 1236 + `OR` - True if at least one filter condition evaluates to true | |
| 1237 | |
| 1238 Optionally accept a ``exclusive_start_key`` which is used to get | |
| 1239 the remaining items when a query cannot return the complete count. | |
| 1240 | |
| 1241 Returns an integer which represents the exact amount of matched | |
| 1242 items. | |
| 1243 | |
| 1244 :type scan_index_forward: boolean | |
| 1245 :param scan_index_forward: Specifies ascending (true) or descending | |
| 1246 (false) traversal of the index. DynamoDB returns results reflecting | |
| 1247 the requested order determined by the range key. If the data type | |
| 1248 is Number, the results are returned in numeric order. For String, | |
| 1249 the results are returned in order of ASCII character code values. | |
| 1250 For Binary, DynamoDB treats each byte of the binary data as | |
| 1251 unsigned when it compares binary values. | |
| 1252 | |
| 1253 If ScanIndexForward is not specified, the results are returned in | |
| 1254 ascending order. | |
| 1255 | |
| 1256 :type limit: integer | |
| 1257 :param limit: The maximum number of items to evaluate (not necessarily | |
| 1258 the number of matching items). | |
| 1259 | |
| 1260 Example:: | |
| 1261 | |
| 1262 # Look for last names equal to "Doe". | |
| 1263 >>> users.query_count(last_name__eq='Doe') | |
| 1264 5 | |
| 1265 | |
| 1266 # Use an LSI & a consistent read. | |
| 1267 >>> users.query_count( | |
| 1268 ... date_joined__gte=1236451000, | |
| 1269 ... owner__eq=1, | |
| 1270 ... index='DateJoinedIndex', | |
| 1271 ... consistent=True | |
| 1272 ... ) | |
| 1273 2 | |
| 1274 | |
| 1275 """ | |
| 1276 key_conditions = self._build_filters( | |
| 1277 filter_kwargs, | |
| 1278 using=QUERY_OPERATORS | |
| 1279 ) | |
| 1280 | |
| 1281 built_query_filter = self._build_filters( | |
| 1282 query_filter, | |
| 1283 using=FILTER_OPERATORS | |
| 1284 ) | |
| 1285 | |
| 1286 count_buffer = 0 | |
| 1287 last_evaluated_key = exclusive_start_key | |
| 1288 | |
| 1289 while True: | |
| 1290 raw_results = self.connection.query( | |
| 1291 self.table_name, | |
| 1292 index_name=index, | |
| 1293 consistent_read=consistent, | |
| 1294 select='COUNT', | |
| 1295 key_conditions=key_conditions, | |
| 1296 query_filter=built_query_filter, | |
| 1297 conditional_operator=conditional_operator, | |
| 1298 limit=limit, | |
| 1299 scan_index_forward=scan_index_forward, | |
| 1300 exclusive_start_key=last_evaluated_key | |
| 1301 ) | |
| 1302 | |
| 1303 count_buffer += int(raw_results.get('Count', 0)) | |
| 1304 last_evaluated_key = raw_results.get('LastEvaluatedKey') | |
| 1305 if not last_evaluated_key or count_buffer < 1: | |
| 1306 break | |
| 1307 | |
| 1308 return count_buffer | |
| 1309 | |
| 1310 def _query(self, limit=None, index=None, reverse=False, consistent=False, | |
| 1311 exclusive_start_key=None, select=None, attributes_to_get=None, | |
| 1312 query_filter=None, conditional_operator=None, **filter_kwargs): | |
| 1313 """ | |
| 1314 The internal method that performs the actual queries. Used extensively | |
| 1315 by ``ResultSet`` to perform each (paginated) request. | |
| 1316 """ | |
| 1317 kwargs = { | |
| 1318 'limit': limit, | |
| 1319 'index_name': index, | |
| 1320 'consistent_read': consistent, | |
| 1321 'select': select, | |
| 1322 'attributes_to_get': attributes_to_get, | |
| 1323 'conditional_operator': conditional_operator, | |
| 1324 } | |
| 1325 | |
| 1326 if reverse: | |
| 1327 kwargs['scan_index_forward'] = False | |
| 1328 | |
| 1329 if exclusive_start_key: | |
| 1330 kwargs['exclusive_start_key'] = {} | |
| 1331 | |
| 1332 for key, value in exclusive_start_key.items(): | |
| 1333 kwargs['exclusive_start_key'][key] = \ | |
| 1334 self._dynamizer.encode(value) | |
| 1335 | |
| 1336 # Convert the filters into something we can actually use. | |
| 1337 kwargs['key_conditions'] = self._build_filters( | |
| 1338 filter_kwargs, | |
| 1339 using=QUERY_OPERATORS | |
| 1340 ) | |
| 1341 | |
| 1342 kwargs['query_filter'] = self._build_filters( | |
| 1343 query_filter, | |
| 1344 using=FILTER_OPERATORS | |
| 1345 ) | |
| 1346 | |
| 1347 raw_results = self.connection.query( | |
| 1348 self.table_name, | |
| 1349 **kwargs | |
| 1350 ) | |
| 1351 results = [] | |
| 1352 last_key = None | |
| 1353 | |
| 1354 for raw_item in raw_results.get('Items', []): | |
| 1355 item = Item(self) | |
| 1356 item.load({ | |
| 1357 'Item': raw_item, | |
| 1358 }) | |
| 1359 results.append(item) | |
| 1360 | |
| 1361 if raw_results.get('LastEvaluatedKey', None): | |
| 1362 last_key = {} | |
| 1363 | |
| 1364 for key, value in raw_results['LastEvaluatedKey'].items(): | |
| 1365 last_key[key] = self._dynamizer.decode(value) | |
| 1366 | |
| 1367 return { | |
| 1368 'results': results, | |
| 1369 'last_key': last_key, | |
| 1370 } | |
| 1371 | |
| 1372 def scan(self, limit=None, segment=None, total_segments=None, | |
| 1373 max_page_size=None, attributes=None, conditional_operator=None, | |
| 1374 **filter_kwargs): | |
| 1375 """ | |
| 1376 Scans across all items within a DynamoDB table. | |
| 1377 | |
| 1378 Scans can be performed against a hash key or a hash+range key. You can | |
| 1379 additionally filter the results after the table has been read but | |
| 1380 before the response is returned by using query filters. | |
| 1381 | |
| 1382 To specify the filters of the items you'd like to get, you can specify | |
| 1383 the filters as kwargs. Each filter kwarg should follow the pattern | |
| 1384 ``<fieldname>__<filter_operation>=<value_to_look_for>``. | |
| 1385 | |
| 1386 Optionally accepts a ``limit`` parameter, which should be an integer | |
| 1387 count of the total number of items to return. (Default: ``None`` - | |
| 1388 all results) | |
| 1389 | |
| 1390 Optionally accepts a ``segment`` parameter, which should be an integer | |
| 1391 of the segment to retrieve on. Please see the documentation about | |
| 1392 Parallel Scans (Default: ``None`` - no segments) | |
| 1393 | |
| 1394 Optionally accepts a ``total_segments`` parameter, which should be an | |
| 1395 integer count of number of segments to divide the table into. | |
| 1396 Please see the documentation about Parallel Scans (Default: ``None`` - | |
| 1397 no segments) | |
| 1398 | |
| 1399 Optionally accepts a ``max_page_size`` parameter, which should be an | |
| 1400 integer count of the maximum number of items to retrieve | |
| 1401 **per-request**. This is useful in making faster requests & prevent | |
| 1402 the scan from drowning out other queries. (Default: ``None`` - | |
| 1403 fetch as many as DynamoDB will return) | |
| 1404 | |
| 1405 Optionally accepts an ``attributes`` parameter, which should be a | |
| 1406 tuple. If you provide any attributes only these will be fetched | |
| 1407 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
| 1408 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
| 1409 | |
| 1410 Returns a ``ResultSet``, which transparently handles the pagination of | |
| 1411 results you get back. | |
| 1412 | |
| 1413 Example:: | |
| 1414 | |
| 1415 # All results. | |
| 1416 >>> everything = users.scan() | |
| 1417 | |
| 1418 # Look for last names beginning with "D". | |
| 1419 >>> results = users.scan(last_name__beginswith='D') | |
| 1420 >>> for res in results: | |
| 1421 ... print res['first_name'] | |
| 1422 'Alice' | |
| 1423 'John' | |
| 1424 'Jane' | |
| 1425 | |
| 1426 # Use an ``IN`` filter & limit. | |
| 1427 >>> results = users.scan( | |
| 1428 ... age__in=[25, 26, 27, 28, 29], | |
| 1429 ... limit=1 | |
| 1430 ... ) | |
| 1431 >>> for res in results: | |
| 1432 ... print res['first_name'] | |
| 1433 'Alice' | |
| 1434 | |
| 1435 """ | |
| 1436 results = ResultSet( | |
| 1437 max_page_size=max_page_size | |
| 1438 ) | |
| 1439 kwargs = filter_kwargs.copy() | |
| 1440 kwargs.update({ | |
| 1441 'limit': limit, | |
| 1442 'segment': segment, | |
| 1443 'total_segments': total_segments, | |
| 1444 'attributes': attributes, | |
| 1445 'conditional_operator': conditional_operator, | |
| 1446 }) | |
| 1447 results.to_call(self._scan, **kwargs) | |
| 1448 return results | |
| 1449 | |
| 1450 def _scan(self, limit=None, exclusive_start_key=None, segment=None, | |
| 1451 total_segments=None, attributes=None, conditional_operator=None, | |
| 1452 **filter_kwargs): | |
| 1453 """ | |
| 1454 The internal method that performs the actual scan. Used extensively | |
| 1455 by ``ResultSet`` to perform each (paginated) request. | |
| 1456 """ | |
| 1457 kwargs = { | |
| 1458 'limit': limit, | |
| 1459 'segment': segment, | |
| 1460 'total_segments': total_segments, | |
| 1461 'attributes_to_get': attributes, | |
| 1462 'conditional_operator': conditional_operator, | |
| 1463 } | |
| 1464 | |
| 1465 if exclusive_start_key: | |
| 1466 kwargs['exclusive_start_key'] = {} | |
| 1467 | |
| 1468 for key, value in exclusive_start_key.items(): | |
| 1469 kwargs['exclusive_start_key'][key] = \ | |
| 1470 self._dynamizer.encode(value) | |
| 1471 | |
| 1472 # Convert the filters into something we can actually use. | |
| 1473 kwargs['scan_filter'] = self._build_filters( | |
| 1474 filter_kwargs, | |
| 1475 using=FILTER_OPERATORS | |
| 1476 ) | |
| 1477 | |
| 1478 raw_results = self.connection.scan( | |
| 1479 self.table_name, | |
| 1480 **kwargs | |
| 1481 ) | |
| 1482 results = [] | |
| 1483 last_key = None | |
| 1484 | |
| 1485 for raw_item in raw_results.get('Items', []): | |
| 1486 item = Item(self) | |
| 1487 item.load({ | |
| 1488 'Item': raw_item, | |
| 1489 }) | |
| 1490 results.append(item) | |
| 1491 | |
| 1492 if raw_results.get('LastEvaluatedKey', None): | |
| 1493 last_key = {} | |
| 1494 | |
| 1495 for key, value in raw_results['LastEvaluatedKey'].items(): | |
| 1496 last_key[key] = self._dynamizer.decode(value) | |
| 1497 | |
| 1498 return { | |
| 1499 'results': results, | |
| 1500 'last_key': last_key, | |
| 1501 } | |
| 1502 | |
| 1503 def batch_get(self, keys, consistent=False, attributes=None): | |
| 1504 """ | |
| 1505 Fetches many specific items in batch from a table. | |
| 1506 | |
| 1507 Requires a ``keys`` parameter, which should be a list of dictionaries. | |
| 1508 Each dictionary should consist of the keys values to specify. | |
| 1509 | |
| 1510 Optionally accepts a ``consistent`` parameter, which should be a | |
| 1511 boolean. If you provide ``True``, a strongly consistent read will be | |
| 1512 used. (Default: False) | |
| 1513 | |
| 1514 Optionally accepts an ``attributes`` parameter, which should be a | |
| 1515 tuple. If you provide any attributes only these will be fetched | |
| 1516 from DynamoDB. | |
| 1517 | |
| 1518 Returns a ``ResultSet``, which transparently handles the pagination of | |
| 1519 results you get back. | |
| 1520 | |
| 1521 Example:: | |
| 1522 | |
| 1523 >>> results = users.batch_get(keys=[ | |
| 1524 ... { | |
| 1525 ... 'username': 'johndoe', | |
| 1526 ... }, | |
| 1527 ... { | |
| 1528 ... 'username': 'jane', | |
| 1529 ... }, | |
| 1530 ... { | |
| 1531 ... 'username': 'fred', | |
| 1532 ... }, | |
| 1533 ... ]) | |
| 1534 >>> for res in results: | |
| 1535 ... print res['first_name'] | |
| 1536 'John' | |
| 1537 'Jane' | |
| 1538 'Fred' | |
| 1539 | |
| 1540 """ | |
| 1541 # We pass the keys to the constructor instead, so it can maintain it's | |
| 1542 # own internal state as to what keys have been processed. | |
| 1543 results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get) | |
| 1544 results.to_call(self._batch_get, consistent=consistent, attributes=attributes) | |
| 1545 return results | |
| 1546 | |
| 1547 def _batch_get(self, keys, consistent=False, attributes=None): | |
| 1548 """ | |
| 1549 The internal method that performs the actual batch get. Used extensively | |
| 1550 by ``BatchGetResultSet`` to perform each (paginated) request. | |
| 1551 """ | |
| 1552 items = { | |
| 1553 self.table_name: { | |
| 1554 'Keys': [], | |
| 1555 }, | |
| 1556 } | |
| 1557 | |
| 1558 if consistent: | |
| 1559 items[self.table_name]['ConsistentRead'] = True | |
| 1560 | |
| 1561 if attributes is not None: | |
| 1562 items[self.table_name]['AttributesToGet'] = attributes | |
| 1563 | |
| 1564 for key_data in keys: | |
| 1565 raw_key = {} | |
| 1566 | |
| 1567 for key, value in key_data.items(): | |
| 1568 raw_key[key] = self._dynamizer.encode(value) | |
| 1569 | |
| 1570 items[self.table_name]['Keys'].append(raw_key) | |
| 1571 | |
| 1572 raw_results = self.connection.batch_get_item(request_items=items) | |
| 1573 results = [] | |
| 1574 unprocessed_keys = [] | |
| 1575 | |
| 1576 for raw_item in raw_results['Responses'].get(self.table_name, []): | |
| 1577 item = Item(self) | |
| 1578 item.load({ | |
| 1579 'Item': raw_item, | |
| 1580 }) | |
| 1581 results.append(item) | |
| 1582 | |
| 1583 raw_unproccessed = raw_results.get('UnprocessedKeys', {}) | |
| 1584 | |
| 1585 for raw_key in raw_unproccessed.get('Keys', []): | |
| 1586 py_key = {} | |
| 1587 | |
| 1588 for key, value in raw_key.items(): | |
| 1589 py_key[key] = self._dynamizer.decode(value) | |
| 1590 | |
| 1591 unprocessed_keys.append(py_key) | |
| 1592 | |
| 1593 return { | |
| 1594 'results': results, | |
| 1595 # NEVER return a ``last_key``. Just in-case any part of | |
| 1596 # ``ResultSet`` peeks through, since much of the | |
| 1597 # original underlying implementation is based on this key. | |
| 1598 'last_key': None, | |
| 1599 'unprocessed_keys': unprocessed_keys, | |
| 1600 } | |
| 1601 | |
| 1602 def count(self): | |
| 1603 """ | |
| 1604 Returns a (very) eventually consistent count of the number of items | |
| 1605 in a table. | |
| 1606 | |
| 1607 Lag time is about 6 hours, so don't expect a high degree of accuracy. | |
| 1608 | |
| 1609 Example:: | |
| 1610 | |
| 1611 >>> users.count() | |
| 1612 6 | |
| 1613 | |
| 1614 """ | |
| 1615 info = self.describe() | |
| 1616 return info['Table'].get('ItemCount', 0) | |
| 1617 | |
| 1618 | |
| 1619 class BatchTable(object): | |
| 1620 """ | |
| 1621 Used by ``Table`` as the context manager for batch writes. | |
| 1622 | |
| 1623 You likely don't want to try to use this object directly. | |
| 1624 """ | |
| 1625 def __init__(self, table): | |
| 1626 self.table = table | |
| 1627 self._to_put = [] | |
| 1628 self._to_delete = [] | |
| 1629 self._unprocessed = [] | |
| 1630 | |
| 1631 def __enter__(self): | |
| 1632 return self | |
| 1633 | |
| 1634 def __exit__(self, type, value, traceback): | |
| 1635 if self._to_put or self._to_delete: | |
| 1636 # Flush anything that's left. | |
| 1637 self.flush() | |
| 1638 | |
| 1639 if self._unprocessed: | |
| 1640 # Finally, handle anything that wasn't processed. | |
| 1641 self.resend_unprocessed() | |
| 1642 | |
| 1643 def put_item(self, data, overwrite=False): | |
| 1644 self._to_put.append(data) | |
| 1645 | |
| 1646 if self.should_flush(): | |
| 1647 self.flush() | |
| 1648 | |
| 1649 def delete_item(self, **kwargs): | |
| 1650 self._to_delete.append(kwargs) | |
| 1651 | |
| 1652 if self.should_flush(): | |
| 1653 self.flush() | |
| 1654 | |
| 1655 def should_flush(self): | |
| 1656 if len(self._to_put) + len(self._to_delete) == 25: | |
| 1657 return True | |
| 1658 | |
| 1659 return False | |
| 1660 | |
| 1661 def flush(self): | |
| 1662 batch_data = { | |
| 1663 self.table.table_name: [ | |
| 1664 # We'll insert data here shortly. | |
| 1665 ], | |
| 1666 } | |
| 1667 | |
| 1668 for put in self._to_put: | |
| 1669 item = Item(self.table, data=put) | |
| 1670 batch_data[self.table.table_name].append({ | |
| 1671 'PutRequest': { | |
| 1672 'Item': item.prepare_full(), | |
| 1673 } | |
| 1674 }) | |
| 1675 | |
| 1676 for delete in self._to_delete: | |
| 1677 batch_data[self.table.table_name].append({ | |
| 1678 'DeleteRequest': { | |
| 1679 'Key': self.table._encode_keys(delete), | |
| 1680 } | |
| 1681 }) | |
| 1682 | |
| 1683 resp = self.table.connection.batch_write_item(batch_data) | |
| 1684 self.handle_unprocessed(resp) | |
| 1685 | |
| 1686 self._to_put = [] | |
| 1687 self._to_delete = [] | |
| 1688 return True | |
| 1689 | |
| 1690 def handle_unprocessed(self, resp): | |
| 1691 if len(resp.get('UnprocessedItems', [])): | |
| 1692 table_name = self.table.table_name | |
| 1693 unprocessed = resp['UnprocessedItems'].get(table_name, []) | |
| 1694 | |
| 1695 # Some items have not been processed. Stow them for now & | |
| 1696 # re-attempt processing on ``__exit__``. | |
| 1697 msg = "%s items were unprocessed. Storing for later." | |
| 1698 boto.log.info(msg % len(unprocessed)) | |
| 1699 self._unprocessed.extend(unprocessed) | |
| 1700 | |
| 1701 def resend_unprocessed(self): | |
| 1702 # If there are unprocessed records (for instance, the user was over | |
| 1703 # their throughput limitations), iterate over them & send until they're | |
| 1704 # all there. | |
| 1705 boto.log.info( | |
| 1706 "Re-sending %s unprocessed items." % len(self._unprocessed) | |
| 1707 ) | |
| 1708 | |
| 1709 while len(self._unprocessed): | |
| 1710 # Again, do 25 at a time. | |
| 1711 to_resend = self._unprocessed[:25] | |
| 1712 # Remove them from the list. | |
| 1713 self._unprocessed = self._unprocessed[25:] | |
| 1714 batch_data = { | |
| 1715 self.table.table_name: to_resend | |
| 1716 } | |
| 1717 boto.log.info("Sending %s items" % len(to_resend)) | |
| 1718 resp = self.table.connection.batch_write_item(batch_data) | |
| 1719 self.handle_unprocessed(resp) | |
| 1720 boto.log.info( | |
| 1721 "%s unprocessed items left" % len(self._unprocessed) | |
| 1722 ) |
