From f92a5381365780b8477d820b9acdbd1603ffafe2 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Tue, 7 Nov 2017 11:01:17 -0800 Subject: [PATCH 01/33] Small type updates and support for schemas - Insert object ids as raw binary - Support for default values and not null - More rational quoting for binary data --- lib/mosql/schema.rb | 104 +++++++++++++++++++++--------------------- lib/mosql/sql.rb | 2 +- lib/mosql/streamer.rb | 5 +- 3 files changed, 58 insertions(+), 53 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7e0f119..8f6fa2e 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -9,11 +9,21 @@ def to_array(lst) col = nil if ent.is_a?(Hash) && ent[:source].is_a?(String) && ent[:type].is_a?(String) # new configuration format - col = { - :source => ent.fetch(:source), - :type => ent.fetch(:type), - :name => (ent.keys - [:source, :type]).first, - } + ment = ent.clone + col = {} + col[:source] = ment.delete(:source) + col[:type] = ment.delete(:type) + col[:default] = ment.delete(:default) if ent.has_key?(:default) + col[:notnull] = ment.delete(:notnull) if ent.has_key?(:notnull) + + + if col[:type].downcase.include? "not null" or col[:type].downcase.include? "default" + raise SchemaError.new("Type has modifiers, use fields to modify type instead: #{ent.inspect}") + end + if ment.keys.length != 1 + raise SchemaError.new("Invalid new configuration entry #{ent.inspect}") + end + col[:name] = ment.keys.first elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) col = { :source => ent.first.first, @@ -75,20 +85,38 @@ def initialize(map) Sequel.default_timezone = :utc end + def qualified_table_name(meta) + if meta.key?(:schema) + Sequel.qualify(meta[:schema], meta[:table]) + else + meta[:table].to_sym + end + end + def create_schema(db, clobber=false) @map.values.each do |dbspec| dbspec.each do |n, collection| next unless n.is_a?(String) meta = collection[:meta] + table_name = qualified_table_name(meta) composite_key = meta[:composite_key] keys = [] - log.info("Creating table '#{meta[:table]}'...") - db.send(clobber ? :create_table! : :create_table?, meta[:table]) do + log.info("Creating table #{db.literal(table_name)}...") + db.send(clobber ? :create_table! : :create_table?, table_name) do collection[:columns].each do |col| opts = {} - if col[:source] == '$timestamp' + if col.key?(:default) + if col[:default] == "now()" + opts[:default] = Sequel.function(:now) + else + opts[:default] = col[:default] + end + elsif col[:source] == '$timestamp' opts[:default] = Sequel.function(:now) end + if col.key?(:notnull) + opts[:null] = !col[:notnull] + end column col[:name], col[:type], opts if composite_key and composite_key.include?(col[:name]) @@ -185,10 +213,13 @@ def fetch_special_source(obj, source, original) end end - def transform_primitive(v, type=nil) + def transform_primitive(v, type) case v - when BSON::ObjectId, Symbol + when Symbol v.to_s + # Hex decode the object ID to a blob so we insert raw binary. + when BSON::ObjectId + Sequel::SQL::Blob.new([v.to_s].pack("H*")) when BSON::Binary if type.downcase == 'uuid' v.to_s.unpack("H*").first @@ -197,6 +228,8 @@ def transform_primitive(v, type=nil) end when BSON::DBRef v.object_id.to_s + when Hash, Array + JSON.dump(v) else v end @@ -223,10 +256,10 @@ def transform(ns, obj, schema=nil) v = fetch_and_delete_dotted(obj, source) case v when Hash - v = JSON.dump(Hash[v.map { |k,v| [k, transform_primitive(v)] }]) + v = JSON.dump(v) when Array - v = v.map { |it| transform_primitive(it) } if col[:array_type] + v = v.map { |it| transform_primitive(it, col[:array_type]) } v = Sequel.pg_array(v, col[:array_type]) else v = JSON.dump(v) @@ -284,52 +317,21 @@ def all_columns(schema, copy=false) end def all_columns_for_copy(schema) - all_columns(schema, true) + # + # We need to return Symbols so that Sequel's DB##copy_into quotes them + # correctly. + # + all_columns(schema, true).map{ |c| c.to_sym } end def copy_data(db, ns, objs) schema = find_ns!(ns) - db.synchronize do |pg| - sql = "COPY \"#{schema[:meta][:table]}\" " + - "(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN" - pg.execute(sql) - objs.each do |o| - pg.put_copy_data(transform_to_copy(ns, o, schema) + "\n") - end - pg.put_copy_end - begin - pg.get_result.check - rescue PGError => e - db.send(:raise_error, e) - end - end - end - - def quote_copy(val) - case val - when nil - "\\N" - when true - 't' - when false - 'f' - when Sequel::SQL::Function - nil - when DateTime, Time - val.strftime("%FT%T.%6N %z") - when Sequel::SQL::Blob - "\\\\x" + [val].pack("h*") - else - val.to_s.gsub(/([\\\t\n\r])/, '\\\\\\1') - end - end - - def transform_to_copy(ns, row, schema=nil) - row.map { |c| quote_copy(c) }.compact.join("\t") + table = qualified_table_name(schema[:meta]) + db[table].import(all_columns_for_copy(schema), objs) end def table_for_ns(ns) - find_ns!(ns)[:meta][:table] + qualified_table_name(find_ns!(ns)[:meta]) end def all_mongo_dbs diff --git a/lib/mosql/sql.rb b/lib/mosql/sql.rb index 2ebda38..b81b500 100644 --- a/lib/mosql/sql.rb +++ b/lib/mosql/sql.rb @@ -23,7 +23,7 @@ def connect_db(uri, pgschema) end def table_for_ns(ns) - @db[@schema.table_for_ns(ns).intern] + @db[@schema.table_for_ns(ns)] end def transform_one_ns(ns, obj) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d630e96..9820019 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -52,7 +52,7 @@ def bulk_upsert(table, ns, items) begin @schema.copy_data(table.db, ns, items) rescue Sequel::DatabaseError => e - log.debug("Bulk insert error (#{e}), attempting invidual upserts...") + log.warn("Bulk insert error (#{e}), attempting invidual upserts...") cols = @schema.all_columns(@schema.find_ns(ns)) items.each do |it| h = {} @@ -163,6 +163,9 @@ def import_collection(ns, collection, filter) unless batch.empty? bulk_upsert(table, ns, batch) end + + elapsed = Time.now - start + log.info("Finished import of #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") end def optail From 31b82d0d230d6adffcc6ae91085ee84aa4b8046c Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Tue, 7 Nov 2017 11:01:17 -0800 Subject: [PATCH 02/33] Support nested objects with foreign keys --- lib/mosql/schema.rb | 227 +++++++++++++++++++++++----------- lib/mosql/sql.rb | 32 ++--- lib/mosql/streamer.rb | 65 +++++----- mosql.gemspec | 2 + test/unit/lib/mosql/schema.rb | 4 +- 5 files changed, 197 insertions(+), 133 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 8f6fa2e..fba937b 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -1,3 +1,5 @@ +require 'active_support/inflector' + module MoSQL class SchemaError < StandardError; end; @@ -52,9 +54,17 @@ def check_columns!(ns, spec) end end - def parse_spec(ns, spec) + def parse_spec(ns, spec, source=[]) out = spec.dup - out[:columns] = to_array(spec.fetch(:columns)) + out[:columns] = to_array(spec.delete(:columns)) + meta = spec.delete(:meta) + out[:subtables] = spec.map do |name, subspec| + newsource = source + [name] + subspec = parse_spec(ns , subspec, newsource) + subspec[:meta][:source] = newsource + subspec[:meta][:parent_fkey] = (meta[:table].to_s.singularize + "_id").to_sym + subspec + end check_columns!(ns, out) out end @@ -93,53 +103,76 @@ def qualified_table_name(meta) end end - def create_schema(db, clobber=false) - @map.values.each do |dbspec| - dbspec.each do |n, collection| - next unless n.is_a?(String) - meta = collection[:meta] - table_name = qualified_table_name(meta) - composite_key = meta[:composite_key] - keys = [] - log.info("Creating table #{db.literal(table_name)}...") - db.send(clobber ? :create_table! : :create_table?, table_name) do - collection[:columns].each do |col| - opts = {} - if col.key?(:default) - if col[:default] == "now()" - opts[:default] = Sequel.function(:now) - else - opts[:default] = col[:default] - end - elsif col[:source] == '$timestamp' + def create_table(db, spec, clobber, parent_table=nil, parent_pk_type = nil) + meta = spec[:meta] + table_name = qualified_table_name(meta) + composite_key = meta[:composite_key] + keys = [] + keytypes = [] + log.info("Creating table #{db.literal(table_name)}...") + db.drop_table?(table_name, :cascade => true) if clobber + db.create_table(table_name) do + spec[:columns].each do |col| + opts = {} + if col.key?(:default) + if col[:default] == "now()" opts[:default] = Sequel.function(:now) + else + opts[:default] = col[:default] end - if col.key?(:notnull) - opts[:null] = !col[:notnull] - end - column col[:name], col[:type], opts + elsif col[:source] == '$timestamp' + opts[:default] = Sequel.function(:now) + end + if col.key?(:notnull) + opts[:null] = !col[:notnull] + end + column col[:name], col[:type], opts + + if composite_key and composite_key.include?(col[:name]) + keys << col[:name].to_sym + keytypes << col[:type] + elsif not composite_key and col[:source].to_sym == :_id + keys << col[:name].to_sym + keytypes << col[:type] + end + end - if composite_key and composite_key.include?(col[:name]) - keys << col[:name].to_sym - elsif not composite_key and col[:source].to_sym == :_id - keys << col[:name].to_sym + if meta[:extra_props] + type = + case meta[:extra_props] + when 'JSON' + 'JSON' + when 'JSONB' + 'JSONB' + else + 'TEXT' end - end + column '_extra_props', type + end - primary_key keys - if meta[:extra_props] - type = - case meta[:extra_props] - when 'JSON' - 'JSON' - when 'JSONB' - 'JSONB' - else - 'TEXT' - end - column '_extra_props', type - end + if !parent_table.nil? + foreign_key meta[:parent_fkey], parent_table, { + :type => parent_pk_type, + :on_delete => :cascade, + :on_update => :cascade + } + keys << meta[:parent_fkey] + keytypes << parent_pk_type end + primary_key keys + end + + spec[:subtables].each do |subspec| + raise "Too many keys for sub table in #{table_name}: #{keys}" unless keys.length == 1 + create_table(db, subspec, clobber, table_name, keytypes.first) + end + end + + def create_schema(db, clobber=false) + @map.values.each do |dbspec| + dbspec.each do |n, collection| + next unless n.is_a?(String) + create_table(db, collection, clobber) end end end @@ -235,20 +268,18 @@ def transform_primitive(v, type) end end - def transform(ns, obj, schema=nil) - schema ||= find_ns!(ns) - + def transform_one(schema, obj) original = obj # Do a deep clone, because we're potentially going to be # mutating embedded objects. obj = BSON.deserialize(BSON.serialize(obj)) - row = [] + row = {} schema[:columns].each do |col| - source = col[:source] type = col[:type] + name = col[:name] if source.start_with?("$") v = fetch_special_source(obj, source, original) @@ -268,7 +299,7 @@ def transform(ns, obj, schema=nil) v = transform_primitive(v, type) end end - row << v + row[name] = v end if schema[:meta][:extra_props] @@ -301,37 +332,79 @@ def sanitize(value) end end - def copy_column?(col) - col[:source] != '$timestamp' - end - - def all_columns(schema, copy=false) - cols = [] - schema[:columns].each do |col| - cols << col[:name] unless copy && !copy_column?(col) - end + def all_columns(schema) + cols = schema[:columns].map { |col| col[:name] } if schema[:meta][:extra_props] cols << "_extra_props" end cols end - def all_columns_for_copy(schema) - # - # We need to return Symbols so that Sequel's DB##copy_into quotes them - # correctly. - # - all_columns(schema, true).map{ |c| c.to_sym } + def primary_table_name_for_ns(ns) + qualified_table_name(find_ns!(ns)[:meta]) + end + + def table_names_for_schema(schema) + [qualified_table_name(schema[:meta])] + schema[:subtables].map { |s| table_names_for_schema(s) }.flatten + end + + def all_table_names_for_ns(ns) + table_names_for_schema(find_ns!(ns)) + end + + def transform_one_ns(ns, obj) + transform_one(find_ns!(ns), obj) + end - def copy_data(db, ns, objs) + def save_all_pks_for_ns(ns, new, old) schema = find_ns!(ns) - table = qualified_table_name(schema[:meta]) - db[table].import(all_columns_for_copy(schema), objs) + primary_sql_keys = primary_sql_keys_for_schema(schema) + + primary_sql_keys.each do |key| + source = schema[:columns].find {|c| c[:name] == key }[:source] + new[source] = old[source] unless new.has_key? source + end + + new end - def table_for_ns(ns) - qualified_table_name(find_ns!(ns)[:meta]) + def bson_dig(obj, *keys) + keys.each do |k| + obj = obj[k.to_s] + break if obj.nil? + end + obj + end + + def all_transforms_for_obj(schema, obj, parent_pks={}, &block) + table_ident = qualified_table_name(schema[:meta]) + primary_keys = primary_sql_keys_for_schema(schema) + + # Make sure to add in the primary keys from any parent tables, since we + # might not automatically have them. + transformed = transform_one(schema, obj).update(parent_pks) + + yield table_ident, primary_keys, transformed + + schema[:subtables].each do |subspec| + source = subspec[:meta][:source] + subobjs = bson_dig(obj, *source) + break if subobjs.nil? + + raise "Too many primary keys" if primary_keys.length > 1 + pks = {subspec[:meta][:parent_fkey] => transformed[primary_keys[0]]} + subobjs.each do |subobj| + all_transforms_for_obj(subspec, subobj, pks, &block) + end + end + end + + def all_transforms_for_ns(ns, documents, &block) + schema = find_ns!(ns) + documents.each do |obj| + all_transforms_for_obj(schema, obj, &block) + end end def all_mongo_dbs @@ -342,16 +415,22 @@ def collections_for_mongo_db(db) (@map[db]||{}).keys end - def primary_sql_key_for_ns(ns) - ns = find_ns!(ns) + def primary_sql_keys_for_schema(schema) keys = [] - if ns[:meta][:composite_key] - keys = ns[:meta][:composite_key] + if schema[:meta][:composite_key] + keys = schema[:meta][:composite_key] else - keys << ns[:columns].find {|c| c[:source] == '_id'}[:name] + keys << schema[:columns].find {|c| c[:source] == '_id'}[:name] + end + if schema[:meta][:parent_fkey] + keys << schema[:meta][:parent_fkey] end return keys end + + def primary_sql_keys_for_ns(ns) + primary_sql_keys_for_schema(find_ns!(ns)) + end end end diff --git a/lib/mosql/sql.rb b/lib/mosql/sql.rb index b81b500..a19ca89 100644 --- a/lib/mosql/sql.rb +++ b/lib/mosql/sql.rb @@ -22,33 +22,25 @@ def connect_db(uri, pgschema) end) end - def table_for_ns(ns) - @db[@schema.table_for_ns(ns)] - end - - def transform_one_ns(ns, obj) - h = {} - cols = @schema.all_columns(@schema.find_ns(ns)) - row = @schema.transform(ns, obj) - cols.zip(row).each { |k,v| h[k] = v } - h + def table_for_ident(ident) + @db[ident] end def upsert_ns(ns, obj) - h = transform_one_ns(ns, obj) - upsert!(table_for_ns(ns), @schema.primary_sql_key_for_ns(ns), h) + @schema.all_transforms_for_ns(ns, [obj]) do |table, pks, row| + upsert!(table_for_ident(table), pks, row) + end end def delete_ns(ns, obj) - primary_sql_keys = @schema.primary_sql_key_for_ns(ns) - h = transform_one_ns(ns, obj) - query = {} - primary_sql_keys.each do |key| - raise "No #{primary_sql_keys} found in transform of #{obj.inspect}" if h[key].nil? - query[key.to_sym] = h[key] + @schema.all_transforms_for_ns(ns, [obj]) do |table, pks, row| + query = {} + pks.each do |key| + raise "No #{primary_sql_keys} found in transform of #{obj.inspect}" if row[key].nil? + query[key.to_sym] = row[key] + end + table_for_ident(table).where(query).delete end - - table_for_ns(ns).where(query).delete end def upsert!(table, table_primary_keys, item) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index 9820019..78812c7 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -49,25 +49,14 @@ def unsafe_handle_exceptions(ns, obj) end def bulk_upsert(table, ns, items) - begin - @schema.copy_data(table.db, ns, items) - rescue Sequel::DatabaseError => e - log.warn("Bulk insert error (#{e}), attempting invidual upserts...") - cols = @schema.all_columns(@schema.find_ns(ns)) - items.each do |it| - h = {} - cols.zip(it).each { |k,v| h[k] = v } - unsafe_handle_exceptions(ns, h) do - @sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h) - end - end - end + table.multi_insert(items) end def with_retries(tries=10) tries.times do |try| begin yield + break rescue Mongo::ConnectionError, Mongo::ConnectionFailure, Mongo::OperationFailure => e # Duplicate key error raise if e.kind_of?(Mongo::OperationFailure) && [11000, 11001].include?(e.error_code) @@ -129,13 +118,29 @@ def initial_import def did_truncate; @did_truncate ||= {}; end + def upsert_all_batches(batches, ns) + # We use all_table_names_for_ns so we can ensure we write the parent table + # before we write the child. + sql_time = 0 + @schema.all_table_names_for_ns(ns).map do |table_name| + unless batches[table_name].empty? + sql_time += track_time do + bulk_upsert(@sql.table_for_ident(table_name), ns, + batches[table_name]) + batches[table_name].clear + end + end + end + sql_time + end + def import_collection(ns, collection, filter) log.info("Importing for #{ns}...") count = 0 - batch = [] - table = @sql.table_for_ns(ns) + batches = Hash[@schema.all_table_names_for_ns(ns).map { |n| [n, []] }] + table = @sql.table_for_ident(@schema.primary_table_name_for_ns(ns)) unless options[:no_drop_tables] || did_truncate[table.first_source] - table.truncate + table.truncate :cascade => true did_truncate[table.first_source] = true end @@ -143,26 +148,22 @@ def import_collection(ns, collection, filter) sql_time = 0 collection.find(filter, :batch_size => BATCH) do |cursor| with_retries do - cursor.each do |obj| - batch << @schema.transform(ns, obj) + @schema.all_transforms_for_ns(ns, cursor) do |ident, _, row| + table = @sql.table_for_ident(ident) count += 1 + batches[ident] << row - if batch.length >= BATCH - sql_time += track_time do - bulk_upsert(table, ns, batch) - end + if count % BATCH == 0 + sql_time += upsert_all_batches(batches, ns) elapsed = Time.now - start log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") - batch.clear exit(0) if @done end end end end - unless batch.empty? - bulk_upsert(table, ns, batch) - end + sql_time += upsert_all_batches(batches, ns) elapsed = Time.now - start log.info("Finished import of #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") @@ -234,22 +235,12 @@ def handle_op(op) log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})") sync_object(ns, selector) else - # The update operation replaces the existing object, but # preserves its _id field, so grab the _id off of the # 'query' field -- it's not guaranteed to be present on the # update. - primary_sql_keys = @schema.primary_sql_key_for_ns(ns) - schema = @schema.find_ns!(ns) - keys = {} - primary_sql_keys.each do |key| - source = schema[:columns].find {|c| c[:name] == key }[:source] - keys[source] = selector[source] - end - - log.debug("upsert #{ns}: #{keys}") + update = @schema.save_all_pks_for_ns(ns, update, selector) - update = keys.merge(update) unsafe_handle_exceptions(ns, update) do @sql.upsert_ns(ns, update) end diff --git a/mosql.gemspec b/mosql.gemspec index 8089b30..70884b9 100644 --- a/mosql.gemspec +++ b/mosql.gemspec @@ -28,6 +28,8 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "bson", "~> 1.10" gem.add_runtime_dependency "bson_ext", "~> 1.10" + gem.add_runtime_dependency "activesupport", "4.2.6" + gem.add_development_dependency "minitest" gem.add_development_dependency "mocha" end diff --git a/test/unit/lib/mosql/schema.rb b/test/unit/lib/mosql/schema.rb index 399008e..2e52a5a 100644 --- a/test/unit/lib/mosql/schema.rb +++ b/test/unit/lib/mosql/schema.rb @@ -104,8 +104,8 @@ class MoSQL::Test::SchemaTest < MoSQL::Test end it 'Can find the primary key of the SQL table' do - assert_equal(['id'], @map.primary_sql_key_for_ns('db.collection')) - assert_equal(['_id'], @map.primary_sql_key_for_ns('db.old_conf_syntax')) + assert_equal(['id'], @map.primary_sql_keys_for_ns('db.collection')) + assert_equal(['_id'], @map.primary_sql_keys_for_ns('db.old_conf_syntax')) end it 'can create a SQL schema' do From b001d8cb0b1723b3c6a4c4bd0245523fd28a4511 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Mon, 13 Nov 2017 00:14:37 -0800 Subject: [PATCH 03/33] Stricter error checking --- lib/mosql/schema.rb | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index fba937b..acfd3de 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -246,6 +246,18 @@ def fetch_special_source(obj, source, original) end end + def sanity_check_type(v, type) + type = type.downcase + if (not v.nil? and not v.is_a? Time and type == "timestamp") or + (v.is_a? Time and not type == "timestamp") or + (v.is_a? Integer and not type.end_with?('int')) or + (not v.nil? and not v.is_a? Integer and type.end_with?('int') and v.modulo(1) != 0) + false + else + true + end + end + def transform_primitive(v, type) case v when Symbol @@ -254,13 +266,9 @@ def transform_primitive(v, type) when BSON::ObjectId Sequel::SQL::Blob.new([v.to_s].pack("H*")) when BSON::Binary - if type.downcase == 'uuid' - v.to_s.unpack("H*").first - else - Sequel::SQL::Blob.new(v.to_s) - end + Sequel::SQL::Blob.new(v.to_s) when BSON::DBRef - v.object_id.to_s + Sequel::SQL::Blob.new([v.object_id.to_s].pack("H*")) when Hash, Array JSON.dump(v) else @@ -276,6 +284,9 @@ def transform_one(schema, obj) obj = BSON.deserialize(BSON.serialize(obj)) row = {} + sql_pks = primary_sql_keys_for_schema(schema) + pk_cols = schema[:columns].select{ |c| sql_pks.include?(c[:name]) } + pks = Hash[pk_cols.map { |c| [c[:name], bson_dig_dotted(obj, c[:source])] }] schema[:columns].each do |col| source = col[:source] type = col[:type] @@ -299,6 +310,15 @@ def transform_one(schema, obj) v = transform_primitive(v, type) end end + + null_allowed = !col[:notnull] or col.has_key?(:default) + if v.nil? and not null_allowed + raise "Invalid null #{source.inspect} for #{pks.inspect}" + elsif v.is_a? Sequel::SQL::Blob and type != "bytea" + raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{pks.inspect}" + elsif not sanity_check_type(v, type) + raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{pks.inspect}" + end row[name] = v end @@ -377,6 +397,10 @@ def bson_dig(obj, *keys) obj end + def bson_dig_dotted(obj, path) + bson_dig(obj, *path.split(".")) + end + def all_transforms_for_obj(schema, obj, parent_pks={}, &block) table_ident = qualified_table_name(schema[:meta]) primary_keys = primary_sql_keys_for_schema(schema) From 8f7c22fe42e4d39182cb5d32e282786bfde98ef4 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Mon, 13 Nov 2017 12:33:20 -0800 Subject: [PATCH 04/33] Better support for alternative primary keys --- lib/mosql/schema.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index acfd3de..e104605 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -442,7 +442,7 @@ def collections_for_mongo_db(db) def primary_sql_keys_for_schema(schema) keys = [] if schema[:meta][:composite_key] - keys = schema[:meta][:composite_key] + keys = schema[:meta][:composite_key].map{ |k| k.to_sym } else keys << schema[:columns].find {|c| c[:source] == '_id'}[:name] end From 677175c461d3d253d43590554bba4de35baa51fb Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Mon, 13 Nov 2017 16:14:39 -0800 Subject: [PATCH 05/33] no active record --- lib/mosql/schema.rb | 2 +- mosql.gemspec | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index e104605..98bcedc 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -1,4 +1,4 @@ -require 'active_support/inflector' +Sequel.extension :inflector module MoSQL class SchemaError < StandardError; end; diff --git a/mosql.gemspec b/mosql.gemspec index 70884b9..8089b30 100644 --- a/mosql.gemspec +++ b/mosql.gemspec @@ -28,8 +28,6 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "bson", "~> 1.10" gem.add_runtime_dependency "bson_ext", "~> 1.10" - gem.add_runtime_dependency "activesupport", "4.2.6" - gem.add_development_dependency "minitest" gem.add_development_dependency "mocha" end From 34330999b4f08452d3435a011b519f93efc0951e Mon Sep 17 00:00:00 2001 From: Josh Beam Date: Mon, 13 Nov 2017 16:39:11 -0800 Subject: [PATCH 06/33] fix timestamp --- lib/mosql/schema.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 98bcedc..b46fddd 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -248,8 +248,8 @@ def fetch_special_source(obj, source, original) def sanity_check_type(v, type) type = type.downcase - if (not v.nil? and not v.is_a? Time and type == "timestamp") or - (v.is_a? Time and not type == "timestamp") or + if (not v.nil? and not v.is_a? Time and type.include? "timestamp") or + (v.is_a? Time and not type.include? "timestamp") or (v.is_a? Integer and not type.end_with?('int')) or (not v.nil? and not v.is_a? Integer and type.end_with?('int') and v.modulo(1) != 0) false From 68d67ddcceec06cddf7f00ba4e446e983f3213d8 Mon Sep 17 00:00:00 2001 From: Josh Beam Date: Mon, 13 Nov 2017 16:49:50 -0800 Subject: [PATCH 07/33] add run script for migration --- run.sh | 1 + 1 file changed, 1 insertion(+) create mode 100755 run.sh diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..ce5ed62 --- /dev/null +++ b/run.sh @@ -0,0 +1 @@ +RUBYLIB=$PWD/lib:$RUBYLIB ./bin/mosql From 38d5d4e38003591d0ec5339d5c9815ea4b53dc87 Mon Sep 17 00:00:00 2001 From: Josh Beam Date: Mon, 13 Nov 2017 16:50:09 -0800 Subject: [PATCH 08/33] fix run script --- run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/run.sh b/run.sh index ce5ed62..e7a02b8 100755 --- a/run.sh +++ b/run.sh @@ -1 +1,3 @@ -RUBYLIB=$PWD/lib:$RUBYLIB ./bin/mosql +#!/bin/bash + +RUBYLIB=$PWD/lib:$RUBYLIB ./bin/mosql "$@" From 18f043349e24c7ba4880a05748819fe6c32efe36 Mon Sep 17 00:00:00 2001 From: Josh Beam Date: Mon, 13 Nov 2017 16:59:25 -0800 Subject: [PATCH 09/33] =?UTF-8?q?do=20alex=E2=80=99s=20run=20script?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/run.sh | 7 +++++++ run.sh | 3 --- 2 files changed, 7 insertions(+), 3 deletions(-) create mode 100755 bin/run.sh delete mode 100755 run.sh diff --git a/bin/run.sh b/bin/run.sh new file mode 100755 index 0000000..6638065 --- /dev/null +++ b/bin/run.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +scriptdir=$(python -c "import os; print(os.path.realpath('$(dirname $0)'))") +rootdir=$scriptdir/../ + +export RUBYLIB=$rootdir/lib:$RUBYLIB +exec $rootdir/bin/mosql "$@" diff --git a/run.sh b/run.sh deleted file mode 100755 index e7a02b8..0000000 --- a/run.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -RUBYLIB=$PWD/lib:$RUBYLIB ./bin/mosql "$@" From 1ee7f024982cf0dd5aa4a41fa7c222845cf5934f Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Mon, 13 Nov 2017 23:52:33 -0800 Subject: [PATCH 10/33] looser type checking --- lib/mosql/schema.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index b46fddd..ebef497 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -250,8 +250,8 @@ def sanity_check_type(v, type) type = type.downcase if (not v.nil? and not v.is_a? Time and type.include? "timestamp") or (v.is_a? Time and not type.include? "timestamp") or - (v.is_a? Integer and not type.end_with?('int')) or - (not v.nil? and not v.is_a? Integer and type.end_with?('int') and v.modulo(1) != 0) + (v.is_a? Integer and not type.include?('int') and not type.include?('float')) or + (not v.nil? and not v.is_a? Integer and type.include?('int') and v.modulo(1) != 0) false else true From 0919a8b1609ee6ef866a1ccdeaac9dd08e0c2032 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Tue, 14 Nov 2017 00:02:13 -0800 Subject: [PATCH 11/33] parallel import --- lib/mosql/streamer.rb | 6 ++++-- mosql.gemspec | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index 78812c7..621788a 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -1,3 +1,5 @@ +require 'Parallel' + module MoSQL class Streamer include MoSQL::Logging @@ -106,7 +108,7 @@ def initial_import db = @mongo.db(dbname) collections = db.collections.select { |c| spec.key?(c.name) } - collections.each do |collection| + Parallel.each(collections, in_threads: 8) do |collection| ns = "#{dbname}.#{collection.name}" import_collection(ns, collection, spec[collection.name][:meta][:filter]) exit(0) if @done @@ -156,7 +158,7 @@ def import_collection(ns, collection, filter) if count % BATCH == 0 sql_time += upsert_all_batches(batches, ns) elapsed = Time.now - start - log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") + log.info("Imported #{count} rows into #{collection} (#{elapsed}s, #{sql_time}s SQL)...") exit(0) if @done end end diff --git a/mosql.gemspec b/mosql.gemspec index 8089b30..2bb14b3 100644 --- a/mosql.gemspec +++ b/mosql.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "rake" gem.add_runtime_dependency "log4r" gem.add_runtime_dependency "json" + gem.add_runtime_dependency "parallel" gem.add_runtime_dependency "mongoriver", "0.4" From 2d7950d1ea829eeb2f9cced40123c2950fc389c2 Mon Sep 17 00:00:00 2001 From: Josh Beam Date: Tue, 14 Nov 2017 11:52:59 -0800 Subject: [PATCH 12/33] fix sanity check --- lib/mosql/schema.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index ebef497..7c6697f 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -316,6 +316,12 @@ def transform_one(schema, obj) raise "Invalid null #{source.inspect} for #{pks.inspect}" elsif v.is_a? Sequel::SQL::Blob and type != "bytea" raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{pks.inspect}" + elsif col[:array_type] + v.each_with_index do |e, i| + if not sanity_check_type(e, col[:array_type]) + raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{pks.inspect}" + end + end elsif not sanity_check_type(v, type) raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{pks.inspect}" end From 9e10bfb7b190449a071145ea80a1d04d77db36bc Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Tue, 14 Nov 2017 12:03:54 -0800 Subject: [PATCH 13/33] allow subtables of composite key tables --- lib/mosql/schema.rb | 111 +++++++++++++++++++++++------------------- lib/mosql/streamer.rb | 2 +- 2 files changed, 63 insertions(+), 50 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7c6697f..e9715bd 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -54,15 +54,21 @@ def check_columns!(ns, spec) end end - def parse_spec(ns, spec, source=[]) + def parent_scope_column(parent, colname) + (parent.to_s.singularize + "_" + colname.to_s).to_sym + end + + def parse_spec(ns, spec, source=[], parent_pks=[]) out = spec.dup out[:columns] = to_array(spec.delete(:columns)) meta = spec.delete(:meta) + pks = parent_pks + primary_sql_keys_for_schema(out).map { |k| parent_scope_column(meta[:table], k) } + out[:subtables] = spec.map do |name, subspec| newsource = source + [name] - subspec = parse_spec(ns , subspec, newsource) + subspec = parse_spec(ns , subspec, newsource, pks) subspec[:meta][:source] = newsource - subspec[:meta][:parent_fkey] = (meta[:table].to_s.singularize + "_id").to_sym + subspec[:meta][:parent_fkeys] = pks subspec end check_columns!(ns, out) @@ -103,12 +109,11 @@ def qualified_table_name(meta) end end - def create_table(db, spec, clobber, parent_table=nil, parent_pk_type = nil) + def create_table(db, spec, clobber, parent_table=nil, parent_pks={}) meta = spec[:meta] table_name = qualified_table_name(meta) composite_key = meta[:composite_key] - keys = [] - keytypes = [] + primary_keys = {} log.info("Creating table #{db.literal(table_name)}...") db.drop_table?(table_name, :cascade => true) if clobber db.create_table(table_name) do @@ -129,11 +134,9 @@ def create_table(db, spec, clobber, parent_table=nil, parent_pk_type = nil) column col[:name], col[:type], opts if composite_key and composite_key.include?(col[:name]) - keys << col[:name].to_sym - keytypes << col[:type] + primary_keys[col[:name].to_sym] = col[:type] elsif not composite_key and col[:source].to_sym == :_id - keys << col[:name].to_sym - keytypes << col[:type] + primary_keys[col[:name].to_sym] = col[:type] end end @@ -151,20 +154,20 @@ def create_table(db, spec, clobber, parent_table=nil, parent_pk_type = nil) end if !parent_table.nil? - foreign_key meta[:parent_fkey], parent_table, { - :type => parent_pk_type, + parent_pks.each do |k, type| + column k, type + end + foreign_key parent_pks.keys, parent_table, { :on_delete => :cascade, :on_update => :cascade } - keys << meta[:parent_fkey] - keytypes << parent_pk_type end - primary_key keys + primary_key primary_keys.keys + parent_pks.keys end + parent_pks = Hash[primary_keys.map { |k, t| [parent_scope_column(meta[:table], k), t] }].merge(parent_pks) spec[:subtables].each do |subspec| - raise "Too many keys for sub table in #{table_name}: #{keys}" unless keys.length == 1 - create_table(db, subspec, clobber, table_name, keytypes.first) + create_table(db, subspec, clobber, table_name, parent_pks) end end @@ -276,17 +279,41 @@ def transform_primitive(v, type) end end - def transform_one(schema, obj) + def transform_value(col, v) + case v + when Hash + JSON.dump(v) + when Array + if col[:array_type] + v = v.map { |it| transform_primitive(it, col[:array_type]) } + Sequel.pg_array(v, col[:array_type]) + else + JSON.dump(v) + end + else + transform_primitive(v, col[:type]) + end + end + + def get_pks_for_debug(schema, obj, parent_pks={}) + pks = parent_pks.clone + sql_pks = primary_sql_keys_for_schema(schema) + schema[:columns].each do |col| + break unless sql_pks.include?(col[:name]) + + pks[col[:name]] = bson_dig_dotted(obj, col[:source]) + end + pks + end + + def transform_one(schema, obj, parent_pks={}) original = obj # Do a deep clone, because we're potentially going to be # mutating embedded objects. obj = BSON.deserialize(BSON.serialize(obj)) - row = {} - sql_pks = primary_sql_keys_for_schema(schema) - pk_cols = schema[:columns].select{ |c| sql_pks.include?(c[:name]) } - pks = Hash[pk_cols.map { |c| [c[:name], bson_dig_dotted(obj, c[:source])] }] + row = parent_pks.clone schema[:columns].each do |col| source = col[:source] type = col[:type] @@ -295,35 +322,22 @@ def transform_one(schema, obj) if source.start_with?("$") v = fetch_special_source(obj, source, original) else - v = fetch_and_delete_dotted(obj, source) - case v - when Hash - v = JSON.dump(v) - when Array - if col[:array_type] - v = v.map { |it| transform_primitive(it, col[:array_type]) } - v = Sequel.pg_array(v, col[:array_type]) - else - v = JSON.dump(v) - end - else - v = transform_primitive(v, type) - end + v = transform_value(col, fetch_and_delete_dotted(obj, source)) end null_allowed = !col[:notnull] or col.has_key?(:default) if v.nil? and not null_allowed - raise "Invalid null #{source.inspect} for #{pks.inspect}" + raise "Invalid null #{source.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" elsif v.is_a? Sequel::SQL::Blob and type != "bytea" - raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{pks.inspect}" - elsif col[:array_type] + raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" + elsif col[:array_type] and not v.nil? v.each_with_index do |e, i| if not sanity_check_type(e, col[:array_type]) - raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{pks.inspect}" + raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" end end - elsif not sanity_check_type(v, type) - raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{pks.inspect}" + elsif not v.nil? and not sanity_check_type(v, type) + raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" end row[name] = v end @@ -380,11 +394,11 @@ def all_table_names_for_ns(ns) def transform_one_ns(ns, obj) transform_one(find_ns!(ns), obj) - end def save_all_pks_for_ns(ns, new, old) schema = find_ns!(ns) + # We only save top level keys. primary_sql_keys = primary_sql_keys_for_schema(schema) primary_sql_keys.each do |key| @@ -413,17 +427,19 @@ def all_transforms_for_obj(schema, obj, parent_pks={}, &block) # Make sure to add in the primary keys from any parent tables, since we # might not automatically have them. - transformed = transform_one(schema, obj).update(parent_pks) + transformed = transform_one(schema, obj, parent_pks) yield table_ident, primary_keys, transformed + pks = Hash[primary_keys.map { |k| [ + parent_scope_column(schema[:meta][:table], k), + transformed[k] + ] } ].update(parent_pks) schema[:subtables].each do |subspec| source = subspec[:meta][:source] subobjs = bson_dig(obj, *source) break if subobjs.nil? - raise "Too many primary keys" if primary_keys.length > 1 - pks = {subspec[:meta][:parent_fkey] => transformed[primary_keys[0]]} subobjs.each do |subobj| all_transforms_for_obj(subspec, subobj, pks, &block) end @@ -452,9 +468,6 @@ def primary_sql_keys_for_schema(schema) else keys << schema[:columns].find {|c| c[:source] == '_id'}[:name] end - if schema[:meta][:parent_fkey] - keys << schema[:meta][:parent_fkey] - end return keys end diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index 621788a..dd3df63 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -158,7 +158,7 @@ def import_collection(ns, collection, filter) if count % BATCH == 0 sql_time += upsert_all_batches(batches, ns) elapsed = Time.now - start - log.info("Imported #{count} rows into #{collection} (#{elapsed}s, #{sql_time}s SQL)...") + log.info("Imported #{count} rows into #{ns} (#{elapsed}s, #{sql_time}s SQL)...") exit(0) if @done end end From 0b00ea0e1014ad288e56823f392447090bec7586 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Tue, 14 Nov 2017 18:00:22 -0800 Subject: [PATCH 14/33] properly find nested names --- lib/mosql/schema.rb | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index e9715bd..446dd5e 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -58,16 +58,15 @@ def parent_scope_column(parent, colname) (parent.to_s.singularize + "_" + colname.to_s).to_sym end - def parse_spec(ns, spec, source=[], parent_pks=[]) + def parse_spec(ns, spec, parent_pks=[]) out = spec.dup out[:columns] = to_array(spec.delete(:columns)) meta = spec.delete(:meta) pks = parent_pks + primary_sql_keys_for_schema(out).map { |k| parent_scope_column(meta[:table], k) } out[:subtables] = spec.map do |name, subspec| - newsource = source + [name] - subspec = parse_spec(ns , subspec, newsource, pks) - subspec[:meta][:source] = newsource + subspec = parse_spec(ns , subspec, pks) + subspec[:meta][:source] = name.to_s subspec[:meta][:parent_fkeys] = pks subspec end @@ -437,7 +436,7 @@ def all_transforms_for_obj(schema, obj, parent_pks={}, &block) ] } ].update(parent_pks) schema[:subtables].each do |subspec| source = subspec[:meta][:source] - subobjs = bson_dig(obj, *source) + subobjs = bson_dig_dotted(obj, source) break if subobjs.nil? subobjs.each do |subobj| From d1d25c07ab273e3a73aa0e136f0a1bb51c5ef587 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Wed, 15 Nov 2017 20:55:27 -0800 Subject: [PATCH 15/33] parent keys come before child keys in tables --- lib/mosql/schema.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 446dd5e..8543d9f 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -161,10 +161,10 @@ def create_table(db, spec, clobber, parent_table=nil, parent_pks={}) :on_update => :cascade } end - primary_key primary_keys.keys + parent_pks.keys + primary_key parent_pks.keys + primary_keys.keys end - parent_pks = Hash[primary_keys.map { |k, t| [parent_scope_column(meta[:table], k), t] }].merge(parent_pks) + parent_pks = parent_pks.merge(Hash[primary_keys.map { |k, t| [parent_scope_column(meta[:table], k), t] }]) spec[:subtables].each do |subspec| create_table(db, subspec, clobber, table_name, parent_pks) end From 30f1bdf2a52b1cb3266be8bdb0581591dc162af8 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 17 Nov 2017 23:43:37 -0800 Subject: [PATCH 16/33] case sensitive --- lib/mosql/streamer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index dd3df63..9a5cddf 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -1,4 +1,4 @@ -require 'Parallel' +require 'parallel' module MoSQL class Streamer From ac49560c34b5dcf3f0f310a40b6130f1cd385035 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Sat, 18 Nov 2017 08:41:44 -0800 Subject: [PATCH 17/33] limit max parallelism --- lib/mosql/streamer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index 9a5cddf..d980f13 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -108,7 +108,7 @@ def initial_import db = @mongo.db(dbname) collections = db.collections.select { |c| spec.key?(c.name) } - Parallel.each(collections, in_threads: 8) do |collection| + Parallel.each(collections, in_threads: 4) do |collection| ns = "#{dbname}.#{collection.name}" import_collection(ns, collection, spec[collection.name][:meta][:filter]) exit(0) if @done From 537910ad1b47b07e43408e2e30f999ce495d7470 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Wed, 22 Nov 2017 13:36:06 -0800 Subject: [PATCH 18/33] bigger pool --- lib/mosql/cli.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mosql/cli.rb b/lib/mosql/cli.rb index 4f537ec..94c2e43 100644 --- a/lib/mosql/cli.rb +++ b/lib/mosql/cli.rb @@ -121,7 +121,7 @@ def parse_args end def connect_mongo - @mongo = Mongo::MongoClient.from_uri(options[:mongo]) + @mongo = Mongo::MongoClient.from_uri(options[:mongo], :pool_size => 8) config = @mongo['admin'].command(:ismaster => 1) if !config['setName'] && !options[:skip_tail] log.warn("`#{options[:mongo]}' is not a replset.") From 98e282f985f29708b2060d021859f35c79528422 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Sun, 26 Nov 2017 16:09:45 -0800 Subject: [PATCH 19/33] print using original --- lib/mosql/schema.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 8543d9f..5b3ea08 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -326,17 +326,17 @@ def transform_one(schema, obj, parent_pks={}) null_allowed = !col[:notnull] or col.has_key?(:default) if v.nil? and not null_allowed - raise "Invalid null #{source.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" + raise "Invalid null #{source.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" elsif v.is_a? Sequel::SQL::Blob and type != "bytea" - raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" + raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" elsif col[:array_type] and not v.nil? v.each_with_index do |e, i| if not sanity_check_type(e, col[:array_type]) - raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" + raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" end end elsif not v.nil? and not sanity_check_type(v, type) - raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}" + raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" end row[name] = v end From 74263237953598e1441cf9f02469b75ff7194785 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Thu, 30 Nov 2017 20:13:38 -0800 Subject: [PATCH 20/33] unconditionally save tail state on initial import --- lib/mosql/streamer.rb | 12 +++++------- lib/mosql/version.rb | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d980f13..cd40afe 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -80,12 +80,10 @@ def track_time def initial_import @schema.create_schema(@sql.db, !options[:no_drop_tables]) - unless options[:skip_tail] - start_state = { - 'time' => nil, - 'position' => @tailer.most_recent_position - } - end + start_state = { + 'time' => nil, + 'position' => @tailer.most_recent_position + } dbnames = [] @@ -115,7 +113,7 @@ def initial_import end end - tailer.save_state(start_state) unless options[:skip_tail] + tailer.save_state(start_state) end def did_truncate; @did_truncate ||= {}; end diff --git a/lib/mosql/version.rb b/lib/mosql/version.rb index f4449b1..70c29c6 100644 --- a/lib/mosql/version.rb +++ b/lib/mosql/version.rb @@ -1,3 +1,3 @@ module MoSQL - VERSION = "0.4.3" + VERSION = "0.4.4" end From 1818ecf8dbd187100f21879844fb0ca1a88c2832 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 1 Dec 2017 09:39:36 -0800 Subject: [PATCH 21/33] fix primary key handling for sub tables --- lib/mosql/schema.rb | 13 ++++++++----- lib/mosql/sql.rb | 15 +++------------ lib/mosql/version.rb | 2 +- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 5b3ea08..db19ce7 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -338,7 +338,7 @@ def transform_one(schema, obj, parent_pks={}) elsif not v.nil? and not sanity_check_type(v, type) raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" end - row[name] = v + row[name.to_sym] = v end if schema[:meta][:extra_props] @@ -422,15 +422,18 @@ def bson_dig_dotted(obj, path) def all_transforms_for_obj(schema, obj, parent_pks={}, &block) table_ident = qualified_table_name(schema[:meta]) - primary_keys = primary_sql_keys_for_schema(schema) # Make sure to add in the primary keys from any parent tables, since we # might not automatically have them. transformed = transform_one(schema, obj, parent_pks) + primary_keys = Hash[primary_sql_keys_for_schema(schema).map { |k| + [ k, transformed[k] ] + }].update(parent_pks) + yield table_ident, primary_keys, transformed - pks = Hash[primary_keys.map { |k| [ + new_parent_pks = Hash[primary_sql_keys_for_schema(schema).map { |k| [ parent_scope_column(schema[:meta][:table], k), transformed[k] ] } ].update(parent_pks) @@ -440,7 +443,7 @@ def all_transforms_for_obj(schema, obj, parent_pks={}, &block) break if subobjs.nil? subobjs.each do |subobj| - all_transforms_for_obj(subspec, subobj, pks, &block) + all_transforms_for_obj(subspec, subobj,new_parent_pks, &block) end end end @@ -465,7 +468,7 @@ def primary_sql_keys_for_schema(schema) if schema[:meta][:composite_key] keys = schema[:meta][:composite_key].map{ |k| k.to_sym } else - keys << schema[:columns].find {|c| c[:source] == '_id'}[:name] + keys << schema[:columns].find {|c| c[:source] == '_id'}[:name].to_sym end return keys diff --git a/lib/mosql/sql.rb b/lib/mosql/sql.rb index a19ca89..383540d 100644 --- a/lib/mosql/sql.rb +++ b/lib/mosql/sql.rb @@ -33,22 +33,13 @@ def upsert_ns(ns, obj) end def delete_ns(ns, obj) - @schema.all_transforms_for_ns(ns, [obj]) do |table, pks, row| - query = {} - pks.each do |key| - raise "No #{primary_sql_keys} found in transform of #{obj.inspect}" if row[key].nil? - query[key.to_sym] = row[key] - end - table_for_ident(table).where(query).delete + @schema.all_transforms_for_ns(ns, [obj]) do |table, pks, _| + table_for_ident(table).where(pks).delete end end def upsert!(table, table_primary_keys, item) - query = {} - table_primary_keys.each do |key| - query[key.to_sym] = item[key] - end - rows = table.where(query).update(item) + rows = table.where(table_primary_keys).update(item) if rows == 0 begin table.insert(item) diff --git a/lib/mosql/version.rb b/lib/mosql/version.rb index 70c29c6..6f5783b 100644 --- a/lib/mosql/version.rb +++ b/lib/mosql/version.rb @@ -1,3 +1,3 @@ module MoSQL - VERSION = "0.4.4" + VERSION = "0.4.5" end From 5aacf664ab9175f61d0ac9d1da6ee694b22d3273 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 1 Dec 2017 10:36:28 -0800 Subject: [PATCH 22/33] :name to be a to_sym --- lib/mosql/schema.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index db19ce7..0654f79 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -25,11 +25,11 @@ def to_array(lst) if ment.keys.length != 1 raise SchemaError.new("Invalid new configuration entry #{ent.inspect}") end - col[:name] = ment.keys.first + col[:name] = ment.keys.first.to_sym elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) col = { :source => ent.first.first, - :name => ent.first.first, + :name => ent.first.first.to_sym, :type => ent.first.last } else From 322f876e219cb1a64506aa43d8bd7d9d22c32517 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 1 Dec 2017 17:22:31 -0800 Subject: [PATCH 23/33] always store types as syms --- lib/mosql/schema.rb | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 0654f79..22134ca 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -59,10 +59,11 @@ def parent_scope_column(parent, colname) end def parse_spec(ns, spec, parent_pks=[]) - out = spec.dup - out[:columns] = to_array(spec.delete(:columns)) - meta = spec.delete(:meta) - pks = parent_pks + primary_sql_keys_for_schema(out).map { |k| parent_scope_column(meta[:table], k) } + out = { + :columns => to_array(spec.delete(:columns)), + :meta => parse_collection_meta(spec.delete(:meta)) + } + pks = parent_pks + primary_sql_keys_for_schema(out).map { |k| parent_scope_column(out[:meta][:table], k) } out[:subtables] = spec.map do |name, subspec| subspec = parse_spec(ns , subspec, pks) @@ -74,7 +75,13 @@ def parse_spec(ns, spec, parent_pks=[]) out end - def parse_meta(meta) + def parse_collection_meta(meta) + meta = {} if meta.nil? + meta[:composite_key] = meta[:composite_key].map { |k| k.to_sym } if meta[:composite_key] + meta + end + + def parse_db_meta(meta) meta = {} if meta.nil? meta[:alias] = [] unless meta.key?(:alias) meta[:alias] = [meta[:alias]] unless meta[:alias].is_a?(Array) @@ -85,7 +92,7 @@ def parse_meta(meta) def initialize(map) @map = {} map.each do |dbname, db| - @map[dbname] = { :meta => parse_meta(db[:meta]) } + @map[dbname] = { :meta => parse_db_meta(db[:meta]) } db.each do |cname, spec| next unless cname.is_a?(String) begin @@ -133,9 +140,9 @@ def create_table(db, spec, clobber, parent_table=nil, parent_pks={}) column col[:name], col[:type], opts if composite_key and composite_key.include?(col[:name]) - primary_keys[col[:name].to_sym] = col[:type] + primary_keys[col[:name]] = col[:type] elsif not composite_key and col[:source].to_sym == :_id - primary_keys[col[:name].to_sym] = col[:type] + primary_keys[col[:name]] = col[:type] end end @@ -466,9 +473,9 @@ def collections_for_mongo_db(db) def primary_sql_keys_for_schema(schema) keys = [] if schema[:meta][:composite_key] - keys = schema[:meta][:composite_key].map{ |k| k.to_sym } + keys = schema[:meta][:composite_key].clone else - keys << schema[:columns].find {|c| c[:source] == '_id'}[:name].to_sym + keys << schema[:columns].find {|c| c[:source] == '_id'}[:name] end return keys From 9e97eaaac99523bc7410327c733e0002bf7dd678 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Mon, 4 Dec 2017 15:32:01 -0800 Subject: [PATCH 24/33] cant log from a signal handler --- lib/mosql/cli.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/mosql/cli.rb b/lib/mosql/cli.rb index 94c2e43..9a8f4c0 100644 --- a/lib/mosql/cli.rb +++ b/lib/mosql/cli.rb @@ -26,7 +26,6 @@ def initialize(args) def setup_signal_handlers %w[TERM INT USR2].each do |sig| Signal.trap(sig) do - log.info("Got SIG#{sig}. Preparing to exit...") @streamer.stop end end From dc6684e40be93f6f4e27b214bf08f3640d222a25 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 8 Dec 2017 09:14:09 -0800 Subject: [PATCH 25/33] puts in signal handler is ok --- lib/mosql/cli.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/mosql/cli.rb b/lib/mosql/cli.rb index 9a8f4c0..a7a939f 100644 --- a/lib/mosql/cli.rb +++ b/lib/mosql/cli.rb @@ -26,6 +26,7 @@ def initialize(args) def setup_signal_handlers %w[TERM INT USR2].each do |sig| Signal.trap(sig) do + puts("Got SIG#{sig}. Preparing to exit...") @streamer.stop end end From a4aa00207f03bd67b06b092563a6db6a0b5d3ca8 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 8 Dec 2017 17:03:28 -0800 Subject: [PATCH 26/33] types can be numerics --- lib/mosql/schema.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 22134ca..6eb49c2 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -259,7 +259,7 @@ def sanity_check_type(v, type) type = type.downcase if (not v.nil? and not v.is_a? Time and type.include? "timestamp") or (v.is_a? Time and not type.include? "timestamp") or - (v.is_a? Integer and not type.include?('int') and not type.include?('float')) or + (v.is_a? Integer and not type.include?('int') and not type.include?('float') and not type.include?("numeric")) or (not v.nil? and not v.is_a? Integer and type.include?('int') and v.modulo(1) != 0) false else From db5c8b14d341d2f856542926200c16cd67e0ac00 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Sun, 17 Dec 2017 12:32:53 -0800 Subject: [PATCH 27/33] better handle delete op --- lib/mosql/schema.rb | 20 ++++++++++++-------- lib/mosql/sql.rb | 6 +++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 6eb49c2..c9d9717 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -312,7 +312,7 @@ def get_pks_for_debug(schema, obj, parent_pks={}) pks end - def transform_one(schema, obj, parent_pks={}) + def transform_one(schema, obj, parent_pks={}, only_pks=false) original = obj # Do a deep clone, because we're potentially going to be @@ -320,35 +320,39 @@ def transform_one(schema, obj, parent_pks={}) obj = BSON.deserialize(BSON.serialize(obj)) row = parent_pks.clone + pk_names = primary_sql_keys_for_schema(schema) schema[:columns].each do |col| source = col[:source] type = col[:type] name = col[:name] + next if only_pks and not pk_names.include?(col[:name]) + if source.start_with?("$") v = fetch_special_source(obj, source, original) else v = transform_value(col, fetch_and_delete_dotted(obj, source)) end + obj_description = "#{get_pks_for_debug(schema, original, parent_pks)} of #{qualified_table_name(schema[:meta])}" null_allowed = !col[:notnull] or col.has_key?(:default) if v.nil? and not null_allowed - raise "Invalid null #{source.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" + raise "Invalid null #{source.inspect} for #{obj_description}" elsif v.is_a? Sequel::SQL::Blob and type != "bytea" - raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" + raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{obj_description}" elsif col[:array_type] and not v.nil? v.each_with_index do |e, i| if not sanity_check_type(e, col[:array_type]) - raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" + raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{obj_description}" end end elsif not v.nil? and not sanity_check_type(v, type) - raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{get_pks_for_debug(schema, original, parent_pks)}" + raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{obj_description}" end row[name.to_sym] = v end - if schema[:meta][:extra_props] + if schema[:meta][:extra_props] and not only_pks extra = sanitize(obj) row << JSON.dump(extra) end @@ -481,8 +485,8 @@ def primary_sql_keys_for_schema(schema) return keys end - def primary_sql_keys_for_ns(ns) - primary_sql_keys_for_schema(find_ns!(ns)) + def primary_sql_keys_for_ns_obj(ns, obj) + transform_one(find_ns!(ns), obj, {}, true) end end end diff --git a/lib/mosql/sql.rb b/lib/mosql/sql.rb index 383540d..675a673 100644 --- a/lib/mosql/sql.rb +++ b/lib/mosql/sql.rb @@ -33,9 +33,9 @@ def upsert_ns(ns, obj) end def delete_ns(ns, obj) - @schema.all_transforms_for_ns(ns, [obj]) do |table, pks, _| - table_for_ident(table).where(pks).delete - end + table = table_for_ident(@schema.primary_table_name_for_ns(ns)) + pks = @schema.primary_sql_keys_for_ns_obj(ns, obj) + table.where(pks).delete end def upsert!(table, table_primary_keys, item) From 55a0ba247650f24c3e239287acad74c09d967160 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Sun, 17 Dec 2017 17:44:03 -0800 Subject: [PATCH 28/33] more likely to quit --- lib/mosql/streamer.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index cd40afe..df57337 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -157,8 +157,8 @@ def import_collection(ns, collection, filter) sql_time += upsert_all_batches(batches, ns) elapsed = Time.now - start log.info("Imported #{count} rows into #{ns} (#{elapsed}s, #{sql_time}s SQL)...") - exit(0) if @done end + exit(0) if @done end end end @@ -176,10 +176,11 @@ def optail end tailer.tail(:from => tail_from, :filter => options[:oplog_filter]) until @done - tailer.stream(1000) do |op| + tailer.stream(5) do |op| handle_op(op) end end + tailer.save_state end def sync_object(ns, selector) From 04eaa698080a382648e67ab3115868eea39c2db3 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Tue, 26 Dec 2017 17:20:26 -0500 Subject: [PATCH 29/33] bail out early if we fail to import a collection --- lib/mosql/streamer.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index df57337..2bec414 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -108,7 +108,12 @@ def initial_import Parallel.each(collections, in_threads: 4) do |collection| ns = "#{dbname}.#{collection.name}" - import_collection(ns, collection, spec[collection.name][:meta][:filter]) + begin + import_collection(ns, collection, spec[collection.name][:meta][:filter]) + rescue Exception => ex + log.error("Error importing collection #{ns} - #{ex.message}:\n#{ex.backtrace.join("\n")}") + raise Parallel::Kill + end exit(0) if @done end end From c2bb55c1b1d6d08d1579b5bf8a807050630701dd Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Thu, 28 Dec 2017 19:32:05 -0500 Subject: [PATCH 30/33] better error message --- lib/mosql/schema.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index c9d9717..b09c6d8 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -305,7 +305,7 @@ def get_pks_for_debug(schema, obj, parent_pks={}) pks = parent_pks.clone sql_pks = primary_sql_keys_for_schema(schema) schema[:columns].each do |col| - break unless sql_pks.include?(col[:name]) + next unless sql_pks.include?(col[:name]) pks[col[:name]] = bson_dig_dotted(obj, col[:source]) end @@ -334,7 +334,7 @@ def transform_one(schema, obj, parent_pks={}, only_pks=false) v = transform_value(col, fetch_and_delete_dotted(obj, source)) end - obj_description = "#{get_pks_for_debug(schema, original, parent_pks)} of #{qualified_table_name(schema[:meta])}" + obj_description = "#{get_pks_for_debug(schema, original, parent_pks)} of #{schema[:meta][:table]}" null_allowed = !col[:notnull] or col.has_key?(:default) if v.nil? and not null_allowed raise "Invalid null #{source.inspect} for #{obj_description}" From 55b820d36170d55d9577378aba733cefd3ce7fa2 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Thu, 28 Dec 2017 19:41:41 -0500 Subject: [PATCH 31/33] dont bail out early --- lib/mosql/streamer.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index 2bec414..2416e22 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -112,7 +112,6 @@ def initial_import import_collection(ns, collection, spec[collection.name][:meta][:filter]) rescue Exception => ex log.error("Error importing collection #{ns} - #{ex.message}:\n#{ex.backtrace.join("\n")}") - raise Parallel::Kill end exit(0) if @done end From 6ec11c92569994d588e720afc71b57fa987c3964 Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 29 Dec 2017 11:55:11 -0800 Subject: [PATCH 32/33] ignore vendor --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 66be0c8..77dcef8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ collections.yml /.bundle/ Gemfile.lock +vendor From 0f19c6247ce32a8305de4267d0a28d6d44745b5f Mon Sep 17 00:00:00 2001 From: Alex Reece Date: Fri, 29 Dec 2017 12:35:09 -0800 Subject: [PATCH 33/33] next instead of break --- lib/mosql/schema.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index b09c6d8..5109b57 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -451,7 +451,7 @@ def all_transforms_for_obj(schema, obj, parent_pks={}, &block) schema[:subtables].each do |subspec| source = subspec[:meta][:source] subobjs = bson_dig_dotted(obj, source) - break if subobjs.nil? + next if subobjs.nil? subobjs.each do |subobj| all_transforms_for_obj(subspec, subobj,new_parent_pks, &block)