diff --git a/.gitignore b/.gitignore index 66be0c8..77dcef8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ collections.yml /.bundle/ Gemfile.lock +vendor diff --git a/bin/run.sh b/bin/run.sh new file mode 100755 index 0000000..6638065 --- /dev/null +++ b/bin/run.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +scriptdir=$(python -c "import os; print(os.path.realpath('$(dirname $0)'))") +rootdir=$scriptdir/../ + +export RUBYLIB=$rootdir/lib:$RUBYLIB +exec $rootdir/bin/mosql "$@" diff --git a/lib/mosql/cli.rb b/lib/mosql/cli.rb index 4f537ec..a7a939f 100644 --- a/lib/mosql/cli.rb +++ b/lib/mosql/cli.rb @@ -26,7 +26,7 @@ def initialize(args) def setup_signal_handlers %w[TERM INT USR2].each do |sig| Signal.trap(sig) do - log.info("Got SIG#{sig}. Preparing to exit...") + puts("Got SIG#{sig}. Preparing to exit...") @streamer.stop end end @@ -121,7 +121,7 @@ def parse_args end def connect_mongo - @mongo = Mongo::MongoClient.from_uri(options[:mongo]) + @mongo = Mongo::MongoClient.from_uri(options[:mongo], :pool_size => 8) config = @mongo['admin'].command(:ismaster => 1) if !config['setName'] && !options[:skip_tail] log.warn("`#{options[:mongo]}' is not a replset.") diff --git a/lib/mosql/schema.rb b/lib/mosql/schema.rb index 7e0f119..5109b57 100644 --- a/lib/mosql/schema.rb +++ b/lib/mosql/schema.rb @@ -1,3 +1,5 @@ +Sequel.extension :inflector + module MoSQL class SchemaError < StandardError; end; @@ -9,15 +11,25 @@ def to_array(lst) col = nil if ent.is_a?(Hash) && ent[:source].is_a?(String) && ent[:type].is_a?(String) # new configuration format - col = { - :source => ent.fetch(:source), - :type => ent.fetch(:type), - :name => (ent.keys - [:source, :type]).first, - } + ment = ent.clone + col = {} + col[:source] = ment.delete(:source) + col[:type] = ment.delete(:type) + col[:default] = ment.delete(:default) if ent.has_key?(:default) + col[:notnull] = ment.delete(:notnull) if ent.has_key?(:notnull) + + + if col[:type].downcase.include? "not null" or col[:type].downcase.include? "default" + raise SchemaError.new("Type has modifiers, use fields to modify type instead: #{ent.inspect}") + end + if ment.keys.length != 1 + raise SchemaError.new("Invalid new configuration entry #{ent.inspect}") + end + col[:name] = ment.keys.first.to_sym elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String) col = { :source => ent.first.first, - :name => ent.first.first, + :name => ent.first.first.to_sym, :type => ent.first.last } else @@ -42,14 +54,34 @@ def check_columns!(ns, spec) end end - def parse_spec(ns, spec) - out = spec.dup - out[:columns] = to_array(spec.fetch(:columns)) + def parent_scope_column(parent, colname) + (parent.to_s.singularize + "_" + colname.to_s).to_sym + end + + def parse_spec(ns, spec, parent_pks=[]) + out = { + :columns => to_array(spec.delete(:columns)), + :meta => parse_collection_meta(spec.delete(:meta)) + } + pks = parent_pks + primary_sql_keys_for_schema(out).map { |k| parent_scope_column(out[:meta][:table], k) } + + out[:subtables] = spec.map do |name, subspec| + subspec = parse_spec(ns , subspec, pks) + subspec[:meta][:source] = name.to_s + subspec[:meta][:parent_fkeys] = pks + subspec + end check_columns!(ns, out) out end - def parse_meta(meta) + def parse_collection_meta(meta) + meta = {} if meta.nil? + meta[:composite_key] = meta[:composite_key].map { |k| k.to_sym } if meta[:composite_key] + meta + end + + def parse_db_meta(meta) meta = {} if meta.nil? meta[:alias] = [] unless meta.key?(:alias) meta[:alias] = [meta[:alias]] unless meta[:alias].is_a?(Array) @@ -60,7 +92,7 @@ def parse_meta(meta) def initialize(map) @map = {} map.each do |dbname, db| - @map[dbname] = { :meta => parse_meta(db[:meta]) } + @map[dbname] = { :meta => parse_db_meta(db[:meta]) } db.each do |cname, spec| next unless cname.is_a?(String) begin @@ -75,43 +107,81 @@ def initialize(map) Sequel.default_timezone = :utc end - def create_schema(db, clobber=false) - @map.values.each do |dbspec| - dbspec.each do |n, collection| - next unless n.is_a?(String) - meta = collection[:meta] - composite_key = meta[:composite_key] - keys = [] - log.info("Creating table '#{meta[:table]}'...") - db.send(clobber ? :create_table! : :create_table?, meta[:table]) do - collection[:columns].each do |col| - opts = {} - if col[:source] == '$timestamp' + def qualified_table_name(meta) + if meta.key?(:schema) + Sequel.qualify(meta[:schema], meta[:table]) + else + meta[:table].to_sym + end + end + + def create_table(db, spec, clobber, parent_table=nil, parent_pks={}) + meta = spec[:meta] + table_name = qualified_table_name(meta) + composite_key = meta[:composite_key] + primary_keys = {} + log.info("Creating table #{db.literal(table_name)}...") + db.drop_table?(table_name, :cascade => true) if clobber + db.create_table(table_name) do + spec[:columns].each do |col| + opts = {} + if col.key?(:default) + if col[:default] == "now()" opts[:default] = Sequel.function(:now) + else + opts[:default] = col[:default] end - column col[:name], col[:type], opts + elsif col[:source] == '$timestamp' + opts[:default] = Sequel.function(:now) + end + if col.key?(:notnull) + opts[:null] = !col[:notnull] + end + column col[:name], col[:type], opts - if composite_key and composite_key.include?(col[:name]) - keys << col[:name].to_sym - elsif not composite_key and col[:source].to_sym == :_id - keys << col[:name].to_sym - end + if composite_key and composite_key.include?(col[:name]) + primary_keys[col[:name]] = col[:type] + elsif not composite_key and col[:source].to_sym == :_id + primary_keys[col[:name]] = col[:type] end + end + + if meta[:extra_props] + type = + case meta[:extra_props] + when 'JSON' + 'JSON' + when 'JSONB' + 'JSONB' + else + 'TEXT' + end + column '_extra_props', type + end - primary_key keys - if meta[:extra_props] - type = - case meta[:extra_props] - when 'JSON' - 'JSON' - when 'JSONB' - 'JSONB' - else - 'TEXT' - end - column '_extra_props', type + if !parent_table.nil? + parent_pks.each do |k, type| + column k, type end + foreign_key parent_pks.keys, parent_table, { + :on_delete => :cascade, + :on_update => :cascade + } end + primary_key parent_pks.keys + primary_keys.keys + end + + parent_pks = parent_pks.merge(Hash[primary_keys.map { |k, t| [parent_scope_column(meta[:table], k), t] }]) + spec[:subtables].each do |subspec| + create_table(db, subspec, clobber, table_name, parent_pks) + end + end + + def create_schema(db, clobber=false) + @map.values.each do |dbspec| + dbspec.each do |n, collection| + next unless n.is_a?(String) + create_table(db, collection, clobber) end end end @@ -185,60 +255,104 @@ def fetch_special_source(obj, source, original) end end - def transform_primitive(v, type=nil) + def sanity_check_type(v, type) + type = type.downcase + if (not v.nil? and not v.is_a? Time and type.include? "timestamp") or + (v.is_a? Time and not type.include? "timestamp") or + (v.is_a? Integer and not type.include?('int') and not type.include?('float') and not type.include?("numeric")) or + (not v.nil? and not v.is_a? Integer and type.include?('int') and v.modulo(1) != 0) + false + else + true + end + end + + def transform_primitive(v, type) case v - when BSON::ObjectId, Symbol + when Symbol v.to_s + # Hex decode the object ID to a blob so we insert raw binary. + when BSON::ObjectId + Sequel::SQL::Blob.new([v.to_s].pack("H*")) when BSON::Binary - if type.downcase == 'uuid' - v.to_s.unpack("H*").first - else - Sequel::SQL::Blob.new(v.to_s) - end + Sequel::SQL::Blob.new(v.to_s) when BSON::DBRef - v.object_id.to_s + Sequel::SQL::Blob.new([v.object_id.to_s].pack("H*")) + when Hash, Array + JSON.dump(v) else v end end - def transform(ns, obj, schema=nil) - schema ||= find_ns!(ns) + def transform_value(col, v) + case v + when Hash + JSON.dump(v) + when Array + if col[:array_type] + v = v.map { |it| transform_primitive(it, col[:array_type]) } + Sequel.pg_array(v, col[:array_type]) + else + JSON.dump(v) + end + else + transform_primitive(v, col[:type]) + end + end + def get_pks_for_debug(schema, obj, parent_pks={}) + pks = parent_pks.clone + sql_pks = primary_sql_keys_for_schema(schema) + schema[:columns].each do |col| + next unless sql_pks.include?(col[:name]) + + pks[col[:name]] = bson_dig_dotted(obj, col[:source]) + end + pks + end + + def transform_one(schema, obj, parent_pks={}, only_pks=false) original = obj # Do a deep clone, because we're potentially going to be # mutating embedded objects. obj = BSON.deserialize(BSON.serialize(obj)) - row = [] + row = parent_pks.clone + pk_names = primary_sql_keys_for_schema(schema) schema[:columns].each do |col| - source = col[:source] type = col[:type] + name = col[:name] + + next if only_pks and not pk_names.include?(col[:name]) if source.start_with?("$") v = fetch_special_source(obj, source, original) else - v = fetch_and_delete_dotted(obj, source) - case v - when Hash - v = JSON.dump(Hash[v.map { |k,v| [k, transform_primitive(v)] }]) - when Array - v = v.map { |it| transform_primitive(it) } - if col[:array_type] - v = Sequel.pg_array(v, col[:array_type]) - else - v = JSON.dump(v) + v = transform_value(col, fetch_and_delete_dotted(obj, source)) + end + + obj_description = "#{get_pks_for_debug(schema, original, parent_pks)} of #{schema[:meta][:table]}" + null_allowed = !col[:notnull] or col.has_key?(:default) + if v.nil? and not null_allowed + raise "Invalid null #{source.inspect} for #{obj_description}" + elsif v.is_a? Sequel::SQL::Blob and type != "bytea" + raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{obj_description}" + elsif col[:array_type] and not v.nil? + v.each_with_index do |e, i| + if not sanity_check_type(e, col[:array_type]) + raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{obj_description}" end - else - v = transform_primitive(v, type) - end + end + elsif not v.nil? and not sanity_check_type(v, type) + raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{obj_description}" end - row << v + row[name.to_sym] = v end - if schema[:meta][:extra_props] + if schema[:meta][:extra_props] and not only_pks extra = sanitize(obj) row << JSON.dump(extra) end @@ -268,68 +382,88 @@ def sanitize(value) end end - def copy_column?(col) - col[:source] != '$timestamp' - end - - def all_columns(schema, copy=false) - cols = [] - schema[:columns].each do |col| - cols << col[:name] unless copy && !copy_column?(col) - end + def all_columns(schema) + cols = schema[:columns].map { |col| col[:name] } if schema[:meta][:extra_props] cols << "_extra_props" end cols end - def all_columns_for_copy(schema) - all_columns(schema, true) + def primary_table_name_for_ns(ns) + qualified_table_name(find_ns!(ns)[:meta]) + end + + def table_names_for_schema(schema) + [qualified_table_name(schema[:meta])] + schema[:subtables].map { |s| table_names_for_schema(s) }.flatten end - def copy_data(db, ns, objs) + def all_table_names_for_ns(ns) + table_names_for_schema(find_ns!(ns)) + end + + def transform_one_ns(ns, obj) + transform_one(find_ns!(ns), obj) + end + + def save_all_pks_for_ns(ns, new, old) schema = find_ns!(ns) - db.synchronize do |pg| - sql = "COPY \"#{schema[:meta][:table]}\" " + - "(#{all_columns_for_copy(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN" - pg.execute(sql) - objs.each do |o| - pg.put_copy_data(transform_to_copy(ns, o, schema) + "\n") - end - pg.put_copy_end - begin - pg.get_result.check - rescue PGError => e - db.send(:raise_error, e) - end + # We only save top level keys. + primary_sql_keys = primary_sql_keys_for_schema(schema) + + primary_sql_keys.each do |key| + source = schema[:columns].find {|c| c[:name] == key }[:source] + new[source] = old[source] unless new.has_key? source end + + new end - def quote_copy(val) - case val - when nil - "\\N" - when true - 't' - when false - 'f' - when Sequel::SQL::Function - nil - when DateTime, Time - val.strftime("%FT%T.%6N %z") - when Sequel::SQL::Blob - "\\\\x" + [val].pack("h*") - else - val.to_s.gsub(/([\\\t\n\r])/, '\\\\\\1') + def bson_dig(obj, *keys) + keys.each do |k| + obj = obj[k.to_s] + break if obj.nil? end + obj + end + + def bson_dig_dotted(obj, path) + bson_dig(obj, *path.split(".")) end - def transform_to_copy(ns, row, schema=nil) - row.map { |c| quote_copy(c) }.compact.join("\t") + def all_transforms_for_obj(schema, obj, parent_pks={}, &block) + table_ident = qualified_table_name(schema[:meta]) + + # Make sure to add in the primary keys from any parent tables, since we + # might not automatically have them. + transformed = transform_one(schema, obj, parent_pks) + + primary_keys = Hash[primary_sql_keys_for_schema(schema).map { |k| + [ k, transformed[k] ] + }].update(parent_pks) + + yield table_ident, primary_keys, transformed + + new_parent_pks = Hash[primary_sql_keys_for_schema(schema).map { |k| [ + parent_scope_column(schema[:meta][:table], k), + transformed[k] + ] } ].update(parent_pks) + schema[:subtables].each do |subspec| + source = subspec[:meta][:source] + subobjs = bson_dig_dotted(obj, source) + next if subobjs.nil? + + subobjs.each do |subobj| + all_transforms_for_obj(subspec, subobj,new_parent_pks, &block) + end + end end - def table_for_ns(ns) - find_ns!(ns)[:meta][:table] + def all_transforms_for_ns(ns, documents, &block) + schema = find_ns!(ns) + documents.each do |obj| + all_transforms_for_obj(schema, obj, &block) + end end def all_mongo_dbs @@ -340,16 +474,19 @@ def collections_for_mongo_db(db) (@map[db]||{}).keys end - def primary_sql_key_for_ns(ns) - ns = find_ns!(ns) + def primary_sql_keys_for_schema(schema) keys = [] - if ns[:meta][:composite_key] - keys = ns[:meta][:composite_key] + if schema[:meta][:composite_key] + keys = schema[:meta][:composite_key].clone else - keys << ns[:columns].find {|c| c[:source] == '_id'}[:name] + keys << schema[:columns].find {|c| c[:source] == '_id'}[:name] end return keys end + + def primary_sql_keys_for_ns_obj(ns, obj) + transform_one(find_ns!(ns), obj, {}, true) + end end end diff --git a/lib/mosql/sql.rb b/lib/mosql/sql.rb index 2ebda38..675a673 100644 --- a/lib/mosql/sql.rb +++ b/lib/mosql/sql.rb @@ -22,41 +22,24 @@ def connect_db(uri, pgschema) end) end - def table_for_ns(ns) - @db[@schema.table_for_ns(ns).intern] - end - - def transform_one_ns(ns, obj) - h = {} - cols = @schema.all_columns(@schema.find_ns(ns)) - row = @schema.transform(ns, obj) - cols.zip(row).each { |k,v| h[k] = v } - h + def table_for_ident(ident) + @db[ident] end def upsert_ns(ns, obj) - h = transform_one_ns(ns, obj) - upsert!(table_for_ns(ns), @schema.primary_sql_key_for_ns(ns), h) + @schema.all_transforms_for_ns(ns, [obj]) do |table, pks, row| + upsert!(table_for_ident(table), pks, row) + end end def delete_ns(ns, obj) - primary_sql_keys = @schema.primary_sql_key_for_ns(ns) - h = transform_one_ns(ns, obj) - query = {} - primary_sql_keys.each do |key| - raise "No #{primary_sql_keys} found in transform of #{obj.inspect}" if h[key].nil? - query[key.to_sym] = h[key] - end - - table_for_ns(ns).where(query).delete + table = table_for_ident(@schema.primary_table_name_for_ns(ns)) + pks = @schema.primary_sql_keys_for_ns_obj(ns, obj) + table.where(pks).delete end def upsert!(table, table_primary_keys, item) - query = {} - table_primary_keys.each do |key| - query[key.to_sym] = item[key] - end - rows = table.where(query).update(item) + rows = table.where(table_primary_keys).update(item) if rows == 0 begin table.insert(item) diff --git a/lib/mosql/streamer.rb b/lib/mosql/streamer.rb index d630e96..2416e22 100644 --- a/lib/mosql/streamer.rb +++ b/lib/mosql/streamer.rb @@ -1,3 +1,5 @@ +require 'parallel' + module MoSQL class Streamer include MoSQL::Logging @@ -49,25 +51,14 @@ def unsafe_handle_exceptions(ns, obj) end def bulk_upsert(table, ns, items) - begin - @schema.copy_data(table.db, ns, items) - rescue Sequel::DatabaseError => e - log.debug("Bulk insert error (#{e}), attempting invidual upserts...") - cols = @schema.all_columns(@schema.find_ns(ns)) - items.each do |it| - h = {} - cols.zip(it).each { |k,v| h[k] = v } - unsafe_handle_exceptions(ns, h) do - @sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h) - end - end - end + table.multi_insert(items) end def with_retries(tries=10) tries.times do |try| begin yield + break rescue Mongo::ConnectionError, Mongo::ConnectionFailure, Mongo::OperationFailure => e # Duplicate key error raise if e.kind_of?(Mongo::OperationFailure) && [11000, 11001].include?(e.error_code) @@ -89,12 +80,10 @@ def track_time def initial_import @schema.create_schema(@sql.db, !options[:no_drop_tables]) - unless options[:skip_tail] - start_state = { - 'time' => nil, - 'position' => @tailer.most_recent_position - } - end + start_state = { + 'time' => nil, + 'position' => @tailer.most_recent_position + } dbnames = [] @@ -117,25 +106,45 @@ def initial_import db = @mongo.db(dbname) collections = db.collections.select { |c| spec.key?(c.name) } - collections.each do |collection| + Parallel.each(collections, in_threads: 4) do |collection| ns = "#{dbname}.#{collection.name}" - import_collection(ns, collection, spec[collection.name][:meta][:filter]) + begin + import_collection(ns, collection, spec[collection.name][:meta][:filter]) + rescue Exception => ex + log.error("Error importing collection #{ns} - #{ex.message}:\n#{ex.backtrace.join("\n")}") + end exit(0) if @done end end - tailer.save_state(start_state) unless options[:skip_tail] + tailer.save_state(start_state) end def did_truncate; @did_truncate ||= {}; end + def upsert_all_batches(batches, ns) + # We use all_table_names_for_ns so we can ensure we write the parent table + # before we write the child. + sql_time = 0 + @schema.all_table_names_for_ns(ns).map do |table_name| + unless batches[table_name].empty? + sql_time += track_time do + bulk_upsert(@sql.table_for_ident(table_name), ns, + batches[table_name]) + batches[table_name].clear + end + end + end + sql_time + end + def import_collection(ns, collection, filter) log.info("Importing for #{ns}...") count = 0 - batch = [] - table = @sql.table_for_ns(ns) + batches = Hash[@schema.all_table_names_for_ns(ns).map { |n| [n, []] }] + table = @sql.table_for_ident(@schema.primary_table_name_for_ns(ns)) unless options[:no_drop_tables] || did_truncate[table.first_source] - table.truncate + table.truncate :cascade => true did_truncate[table.first_source] = true end @@ -143,26 +152,25 @@ def import_collection(ns, collection, filter) sql_time = 0 collection.find(filter, :batch_size => BATCH) do |cursor| with_retries do - cursor.each do |obj| - batch << @schema.transform(ns, obj) + @schema.all_transforms_for_ns(ns, cursor) do |ident, _, row| + table = @sql.table_for_ident(ident) count += 1 + batches[ident] << row - if batch.length >= BATCH - sql_time += track_time do - bulk_upsert(table, ns, batch) - end + if count % BATCH == 0 + sql_time += upsert_all_batches(batches, ns) elapsed = Time.now - start - log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") - batch.clear - exit(0) if @done + log.info("Imported #{count} rows into #{ns} (#{elapsed}s, #{sql_time}s SQL)...") end + exit(0) if @done end end end - unless batch.empty? - bulk_upsert(table, ns, batch) - end + sql_time += upsert_all_batches(batches, ns) + + elapsed = Time.now - start + log.info("Finished import of #{count} rows (#{elapsed}s, #{sql_time}s SQL)...") end def optail @@ -172,10 +180,11 @@ def optail end tailer.tail(:from => tail_from, :filter => options[:oplog_filter]) until @done - tailer.stream(1000) do |op| + tailer.stream(5) do |op| handle_op(op) end end + tailer.save_state end def sync_object(ns, selector) @@ -231,22 +240,12 @@ def handle_op(op) log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})") sync_object(ns, selector) else - # The update operation replaces the existing object, but # preserves its _id field, so grab the _id off of the # 'query' field -- it's not guaranteed to be present on the # update. - primary_sql_keys = @schema.primary_sql_key_for_ns(ns) - schema = @schema.find_ns!(ns) - keys = {} - primary_sql_keys.each do |key| - source = schema[:columns].find {|c| c[:name] == key }[:source] - keys[source] = selector[source] - end - - log.debug("upsert #{ns}: #{keys}") + update = @schema.save_all_pks_for_ns(ns, update, selector) - update = keys.merge(update) unsafe_handle_exceptions(ns, update) do @sql.upsert_ns(ns, update) end diff --git a/lib/mosql/version.rb b/lib/mosql/version.rb index f4449b1..6f5783b 100644 --- a/lib/mosql/version.rb +++ b/lib/mosql/version.rb @@ -1,3 +1,3 @@ module MoSQL - VERSION = "0.4.3" + VERSION = "0.4.5" end diff --git a/mosql.gemspec b/mosql.gemspec index 8089b30..2bb14b3 100644 --- a/mosql.gemspec +++ b/mosql.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "rake" gem.add_runtime_dependency "log4r" gem.add_runtime_dependency "json" + gem.add_runtime_dependency "parallel" gem.add_runtime_dependency "mongoriver", "0.4" diff --git a/test/unit/lib/mosql/schema.rb b/test/unit/lib/mosql/schema.rb index 399008e..2e52a5a 100644 --- a/test/unit/lib/mosql/schema.rb +++ b/test/unit/lib/mosql/schema.rb @@ -104,8 +104,8 @@ class MoSQL::Test::SchemaTest < MoSQL::Test end it 'Can find the primary key of the SQL table' do - assert_equal(['id'], @map.primary_sql_key_for_ns('db.collection')) - assert_equal(['_id'], @map.primary_sql_key_for_ns('db.old_conf_syntax')) + assert_equal(['id'], @map.primary_sql_keys_for_ns('db.collection')) + assert_equal(['_id'], @map.primary_sql_keys_for_ns('db.old_conf_syntax')) end it 'can create a SQL schema' do