Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ GIT
PATH
remote: .
specs:
<<<<<<< HEAD
multiwoven-integrations (0.35.0)
=======
multiwoven-integrations (0.35.5)
>>>>>>> d9813065c (fix(CE): Added a record indentifier for batch support (#1885))
MailchimpMarketing
activesupport
async-websocket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Integrations::Core
class DestinationConnector < BaseConnector
# Records are transformed json payload send it to the destination
# SyncConfig is the Protocol::SyncConfig object
def write(_sync_config, _records, _action = "destination_insert")
def write(_sync_config, _records, _action = "destination_insert", _identifier_key = nil)
raise "Not implemented"
# return Protocol::TrackingMessage
end
Expand Down
6 changes: 4 additions & 2 deletions integrations/lib/multiwoven/integrations/core/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,18 @@ def report_exception(exception, meta = {})
reporter&.report(exception, meta)
end

def log_request_response(level, request, response)
def log_request_response(level, request, response, record_identifier = nil)
Integrations::Protocol::LogMessage.new(
record_identifier: record_identifier,
name: self.class.name,
level: level,
message: { request: request.to_s, response: response.to_s, level: level }.to_json
)
end

def create_log_message(context, type, message)
def create_log_message(context, type, message, record_identifier = nil)
Integrations::Protocol::LogMessage.new(
record_identifier: record_identifier,
name: context,
level: type,
message: message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, _action = "create")
def write(sync_config, records, _action = "create", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
api_key = connection_config[:api_key]
url = sync_config.stream.url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def discover(connection_config)
db&.close
end

def write(sync_config, records, action = "destination_insert")
def write(sync_config, records, action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
records_size = records.size
log_message_array = []
write_success = upload_csv_content(sync_config, records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def discover(connection_config)
)
end

def write(sync_config, records, action = "destination_insert")
def write(sync_config, records, action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = "#{connection_config[:catalog]}.#{connection_config[:schema]}.#{sync_config.stream.name}"
primary_key = sync_config.model.primary_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
access_token = connection_config[:access_token]
url = generate_url(sync_config, connection_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
setup_write_environment(sync_config, action)
process_record_chunks(records, sync_config)
rescue StandardError => e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, _action = "create")
def write(sync_config, records, _action = "create", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
url = connection_config[:destination_url]
headers = connection_config[:headers]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
@action = sync_config.stream.action || action
@sync_config = sync_config
initialize_client(sync_config.destination.connection_specification)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def discover(_connection_config = nil)
)
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
@action = sync_config.stream.action || action
connection_config = sync_config.destination.connection_specification.with_indifferent_access
initialize_client(connection_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
connection_config = connection_config.with_indifferent_access
url = sync_config.stream.url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, _action = "create")
def write(sync_config, records, _action = "create", _identifier_key = nil)
@sync_config = sync_config
initialize_client(sync_config.destination.connection_specification)
process_records(records, sync_config.stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def discover(connection_config)
)
end

def write(sync_config, records, action = "destination_insert")
def write(sync_config, records, action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, _action = "upsert")
def write(sync_config, records, _action = "upsert", _identifier_key = nil)
@sync_config = sync_config
stream = @sync_config.stream
connection_config = @sync_config.destination.connection_specification.with_indifferent_access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
token = connection_config[:token]
file_name = sync_config.stream.name.split(", ").first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def discover(connection_config)
db_client&.close
end

def write(sync_config, records, action = "destination_insert")
def write(sync_config, records, action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
create_connection(connection_config)
model = sync_config.stream.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def discover(connection_config)
)
end

def write(sync_config, records, action = "destination_insert")
def write(sync_config, records, action = "destination_insert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, _action = "upsert")
def write(sync_config, records, _action = "upsert", _identifier_key = nil)
@sync_config = sync_config
connection_config = sync_config.destination.connection_specification.with_indifferent_access
create_connection(connection_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,37 +47,32 @@ def discover(connection_config)
db&.close
end

def write(sync_config, records, action = "destination_insert")
def write(sync_config, records, action = "destination_insert", identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
raw_table = sync_config.stream.name
table_name = qualify_table(connection_config[:schema], raw_table)
primary_key = sync_config.model.primary_key
db = create_connection(connection_config)
<<<<<<< HEAD
=======
primary_key = fetch_primary_key(db, connection_config[:schema], raw_table)
opts = { action: action, identifier_key: identifier_key, sync_config: sync_config }
>>>>>>> d9813065c (fix(CE): Added a record indentifier for batch support (#1885))

write_success = 0
write_failure = 0
log_message_array = []

records.each_slice(MAX_CHUNK_SIZE) do |chunk|
bulk_write(db, table_name, chunk, primary_key, action)
bulk_write(db, table_name, chunk, primary_key, opts)
write_success += chunk.size
log_message_array << log_request_response("info", "bulk_#{action}", "#{chunk.size} rows")
rescue StandardError => e
logger.warn("POSTGRESQL:BULK_WRITE:FALLBACK chunk_size=#{chunk.size} error=#{e.message}")
chunk.each do |record|
response = bulk_write(db, table_name, [record], primary_key, action)
write_success += 1
log_message_array << log_request_response("info", "fallback_#{action}", response)
rescue StandardError => individual_error
handle_exception(individual_error, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", "fallback_#{action}", individual_error.message)
end
success, failure, logs = write_chunk_fallback(db, table_name, chunk, primary_key, opts)
write_success += success
write_failure += failure
log_message_array.concat(logs)
end

tracking_message(write_success, write_failure, log_message_array)
Expand All @@ -94,20 +89,60 @@ def write(sync_config, records, action = "destination_insert")

private

def bulk_write(db, table_name, records, primary_key, action)
def write_chunk_fallback(db, table_name, chunk, primary_key, opts)
action = opts[:action]
identifier_key = opts[:identifier_key]
sync_config = opts[:sync_config]
success = 0
failure = 0
logs = []

chunk.each do |record|
response = bulk_write(db, table_name, [record], primary_key, opts)
success += 1
logs << log_request_response("info", "fallback_#{action}", response, record[identifier_key])
rescue StandardError => individual_error # rubocop:disable Naming/RescuedExceptionsVariableName
handle_exception(individual_error, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
failure += 1
logs << log_request_response("error", "fallback_#{action}", individual_error.message, record[identifier_key])
end

[success, failure, logs]
end

def bulk_write(db, table_name, records, primary_key, opts)
action = opts[:action]
identifier_key = opts[:identifier_key]
return if records.empty?

columns = records.flat_map(&:keys).uniq
columns -= [identifier_key] if identifier_key
col_list = columns.map { |c| quote_ident(c) }.join(", ")

values_clauses = records.map do |record|
vals = columns.map { |col| escape_value(db, record[col]) }
"(#{vals.join(", ")})"
end

<<<<<<< HEAD
sql = "INSERT INTO #{table_name} (#{col_list}) VALUES #{values_clauses.join(", ")}"
sql += build_upsert_clause(columns, primary_key) if action.to_s == "destination_update"
db.exec(sql)
=======
conflict = build_conflict_clause(primary_key, action, columns)
db.exec("INSERT INTO #{table_name} (#{col_list}) VALUES #{values_clauses.join(", ")}#{conflict}")
end

def build_conflict_clause(primary_key, action, columns)
return "" unless primary_key.present?

action.to_s == "destination_insert" ? build_safe_insert_clause(primary_key) : build_upsert_clause(columns, primary_key)
>>>>>>> d9813065c (fix(CE): Added a record indentifier for batch support (#1885))
end

def build_upsert_clause(columns, primary_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def discover(connection_config = nil)
})
end

def write(sync_config, records, _action = "upsert")
def write(sync_config, records, _action = "upsert", _identifier_key = nil)
connection_config = sync_config.destination.connection_specification.with_indifferent_access
collection_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
@action = sync_config.stream.action || action
@sync_config = sync_config
initialize_client(sync_config.destination.connection_specification)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
@action = sync_config.stream.action || action
@sync_config = sync_config
initialize_client(sync_config.destination.connection_specification)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
@sync_config = sync_config
connection_config = sync_config.destination.connection_specification.with_indifferent_access
file_path = generate_file_path(sync_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
# Currently as we only create a message for each record in slack, we are not using actions.
# This will be changed in future.
@sync_config = sync_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def discover(_connection_config = nil)
})
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
@sync_config = sync_config
@action = sync_config.stream.action || action
initialize_client(sync_config.destination.connection_specification)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def discover(connection_config)
})
end

def write(sync_config, records, _action = "destination_insert")
def write(sync_config, records, _action = "destination_insert", _identifier_key = nil)
write_success = 0
write_failure = 0
log_message_array = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def discover(_connection_config = nil)
failure_status(e)
end

def write(sync_config, records, action = "create")
def write(sync_config, records, action = "create", _identifier_key = nil)
@sync_config = sync_config
@action = sync_config.stream.action || action
initialize_client(sync_config.destination.connection_specification)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Connector < ProtocolModel

class LogMessage < ProtocolModel
attribute :level, LogLevel
attribute? :record_identifier, Types::String.optional
attribute :message, Types::String
attribute? :name, Types::String.optional
attribute? :stack_trace, Types::String.optional
Expand Down
Loading
Loading