diff --git a/lib/deimos/utils/db_poller.rb b/lib/deimos/utils/db_poller.rb index 72191c39..f8d97c41 100644 --- a/lib/deimos/utils/db_poller.rb +++ b/lib/deimos/utils/db_poller.rb @@ -109,7 +109,12 @@ def process_updates # by the producer itself. loop do Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}") - batch = fetch_results(time_from, time_to).to_a + batch = if batch_count == 0 + # set min_id to 0 on the initial fetch of results + fetch_results(time_from, time_to, 0).to_a + else + fetch_results(time_from, time_to).to_a + end break if batch.empty? batch_count += 1 @@ -117,20 +122,24 @@ def process_updates message_count += batch.size time_from = last_updated(batch.last) end + # Update last_sent to time_to for this set of updates + @info.last_sent = time_to + @info.save! Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} batches}") end # @param time_from [ActiveSupport::TimeWithZone] # @param time_to [ActiveSupport::TimeWithZone] # @return [ActiveRecord::Relation] - def fetch_results(time_from, time_to) + def fetch_results(time_from, time_to, min_id=nil) id = @producer.config[:record_class].primary_key quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) + min_id = min_id.present? ? min_id : @info.last_sent_id @producer.poll_query(time_from: time_from, time_to: time_to, column_name: @config.timestamp_column, - min_id: @info.last_sent_id). + min_id: min_id). limit(BATCH_SIZE). order("#{quoted_timestamp}, #{quoted_id}") end