comparison venv/bin/dynamodb_load @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 #!/Users/bclaywel/workspace/argo-navis/venv/bin/python2.7
2
3 import argparse
4 import os
5
6 import boto
7 from boto.compat import json
8 from boto.dynamodb.schema import Schema
9
10
11 DESCRIPTION = """Load data into one or more DynamoDB tables.
12
13 For each table, data is read from two files:
14 - {table_name}.metadata for the table's name, schema and provisioned
15 throughput (only required if creating the table).
16 - {table_name}.data for the table's actual contents.
17
18 Both files are searched for in the current directory. To read them from
19 somewhere else, use the --in-dir parameter.
20
21 This program does not wipe the tables prior to loading data. However, any
22 items present in the data files will overwrite the table's contents.
23 """
24
25
26 def _json_iterload(fd):
27 """Lazily load newline-separated JSON objects from a file-like object."""
28 buffer = ""
29 eof = False
30 while not eof:
31 try:
32 # Add a line to the buffer
33 buffer += fd.next()
34 except StopIteration:
35 # We can't let that exception bubble up, otherwise the last
36 # object in the file will never be decoded.
37 eof = True
38 try:
39 # Try to decode a JSON object.
40 json_object = json.loads(buffer.strip())
41
42 # Success: clear the buffer (everything was decoded).
43 buffer = ""
44 except ValueError:
45 if eof and buffer.strip():
46 # No more lines to load and the buffer contains something other
47 # than whitespace: the file is, in fact, malformed.
48 raise
49 # We couldn't decode a complete JSON object: load more lines.
50 continue
51
52 yield json_object
53
54
55 def create_table(metadata_fd):
56 """Create a table from a metadata file-like object."""
57
58
59 def load_table(table, in_fd):
60 """Load items into a table from a file-like object."""
61 for i in _json_iterload(in_fd):
62 # Convert lists back to sets.
63 data = {}
64 for k, v in i.iteritems():
65 if isinstance(v, list):
66 data[k] = set(v)
67 else:
68 data[k] = v
69 table.new_item(attrs=data).put()
70
71
72 def dynamodb_load(tables, in_dir, create_tables):
73 conn = boto.connect_dynamodb()
74 for t in tables:
75 metadata_file = os.path.join(in_dir, "%s.metadata" % t)
76 data_file = os.path.join(in_dir, "%s.data" % t)
77 if create_tables:
78 with open(metadata_file) as meta_fd:
79 metadata = json.load(meta_fd)
80 table = conn.create_table(
81 name=t,
82 schema=Schema(metadata["schema"]),
83 read_units=metadata["read_units"],
84 write_units=metadata["write_units"],
85 )
86 table.refresh(wait_for_active=True)
87 else:
88 table = conn.get_table(t)
89
90 with open(data_file) as in_fd:
91 load_table(table, in_fd)
92
93
94 if __name__ == "__main__":
95 parser = argparse.ArgumentParser(
96 prog="dynamodb_load",
97 description=DESCRIPTION
98 )
99 parser.add_argument(
100 "--create-tables",
101 action="store_true",
102 help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)."
103 )
104 parser.add_argument("--in-dir", default=".")
105 parser.add_argument("tables", metavar="TABLES", nargs="+")
106
107 namespace = parser.parse_args()
108
109 dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables)