diff --git a/app/chewy/accounts_index.rb b/app/chewy/accounts_index.rb index e38e14a106..763958a3f9 100644 --- a/app/chewy/accounts_index.rb +++ b/app/chewy/accounts_index.rb @@ -23,7 +23,7 @@ class AccountsIndex < Chewy::Index }, } - index_scope ::Account.searchable.includes(:account_stat) + index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? } root date_detection: false do field :id, type: 'long' @@ -36,8 +36,8 @@ class AccountsIndex < Chewy::Index field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content' end - field :following_count, type: 'long', value: ->(account) { account.following_count } - field :followers_count, type: 'long', value: ->(account) { account.followers_count } + field :following_count, type: 'long', value: ->(account) { account.following.local.count } + field :followers_count, type: 'long', value: ->(account) { account.followers.local.count } field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at } end end diff --git a/app/chewy/statuses_index.rb b/app/chewy/statuses_index.rb index 6dd4fb18b0..c200098799 100644 --- a/app/chewy/statuses_index.rb +++ b/app/chewy/statuses_index.rb @@ -33,8 +33,6 @@ class StatusesIndex < Chewy::Index }, } - # We do not use delete_if option here because it would call a method that we - # expect to be called with crutches without crutches, causing n+1 queries index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) crutch :mentions do |collection| diff --git a/app/chewy/tags_index.rb b/app/chewy/tags_index.rb index df3d9e4cce..a5b139bcaa 100644 --- a/app/chewy/tags_index.rb +++ b/app/chewy/tags_index.rb @@ -23,11 +23,7 @@ class TagsIndex < Chewy::Index }, } - index_scope ::Tag.listable - - crutch :time_period do - 7.days.ago.to_date..0.days.ago.to_date - end + index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? } root date_detection: false do field :name, type: 'text', analyzer: 'content' do @@ -35,7 +31,7 @@ class TagsIndex < Chewy::Index end field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? } - field :usage, type: 'long', value: ->(tag, crutches) { tag.history.aggregate(crutches.time_period).accounts } + field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } } field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at } end end diff --git a/app/lib/importer/accounts_index_importer.rb b/app/lib/importer/accounts_index_importer.rb deleted file mode 100644 index 792a31b1bd..0000000000 --- a/app/lib/importer/accounts_index_importer.rb +++ /dev/null @@ -1,30 +0,0 @@ -# frozen_string_literal: true - -class Importer::AccountsIndexImporter < Importer::BaseImporter - def import! - scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp| - in_work_unit(tmp) do |accounts| - bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body - - indexed = bulk.select { |entry| entry[:index] }.size - deleted = bulk.select { |entry| entry[:delete] }.size - - Chewy::Index::Import::BulkRequest.new(index).perform(bulk) - - [indexed, deleted] - end - end - - wait! - end - - private - - def index - AccountsIndex - end - - def scope - Account.searchable - end -end diff --git a/app/lib/importer/base_importer.rb b/app/lib/importer/base_importer.rb deleted file mode 100644 index ea522c600c..0000000000 --- a/app/lib/importer/base_importer.rb +++ /dev/null @@ -1,87 +0,0 @@ -# frozen_string_literal: true - -class Importer::BaseImporter - # @param [Integer] batch_size - # @param [Concurrent::ThreadPoolExecutor] executor - def initialize(batch_size:, executor:) - @batch_size = batch_size - @executor = executor - @wait_for = Concurrent::Set.new - end - - # Callback to run when a concurrent work unit completes - # @param [Proc] - def on_progress(&block) - @on_progress = block - end - - # Callback to run when a concurrent work unit fails - # @param [Proc] - def on_failure(&block) - @on_failure = block - end - - # Reduce resource usage during and improve speed of indexing - def optimize_for_import! - Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } } - end - - # Restore original index settings - def optimize_for_search! - Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } } - end - - # Estimate the amount of documents that would be indexed. Not exact! - # @returns [Integer] - def estimate! - ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i } - end - - # Import data from the database into the index - def import! - raise NotImplementedError - end - - # Remove documents from the index that no longer exist in the database - def clean_up! - index.scroll_batches do |documents| - ids = documents.map { |doc| doc['_id'] } - existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true } - tmp = ids.reject { |id| existence_map[id] } - - next if tmp.empty? - - in_work_unit(tmp) do |deleted_ids| - bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body - - Chewy::Index::Import::BulkRequest.new(index).perform(bulk) - - [0, bulk.size] - end - end - - wait! - end - - protected - - def in_work_unit(*args, &block) - work_unit = Concurrent::Promises.future_on(@executor, *args, &block) - - work_unit.on_fulfillment!(&@on_progress) - work_unit.on_rejection!(&@on_failure) - work_unit.on_resolution! { @wait_for.delete(work_unit) } - - @wait_for << work_unit - rescue Concurrent::RejectedExecutionError - sleep(0.1) && retry # Backpressure - end - - def wait! - Concurrent::Promises.zip(*@wait_for).wait - end - - def index - raise NotImplementedError - end -end diff --git a/app/lib/importer/statuses_index_importer.rb b/app/lib/importer/statuses_index_importer.rb deleted file mode 100644 index 7c65325600..0000000000 --- a/app/lib/importer/statuses_index_importer.rb +++ /dev/null @@ -1,89 +0,0 @@ -# frozen_string_literal: true - -class Importer::StatusesIndexImporter < Importer::BaseImporter - def import! - # The idea is that instead of iterating over all statuses in the database - # and calculating the searchable_by for each of them (majority of which - # would be empty), we approach the index from the other end - - scopes.each do |scope| - # We could be tempted to keep track of status IDs we have already processed - # from a different scope to avoid indexing them multiple times, but that - # could end up being a very large array - - scope.find_in_batches(batch_size: @batch_size) do |tmp| - in_work_unit(tmp.map(&:status_id)) do |status_ids| - bulk = ActiveRecord::Base.connection_pool.with_connection do - Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body - end - - indexed = 0 - deleted = 0 - - # We can't use the delete_if proc to do the filtering because delete_if - # is called before rendering the data and we need to filter based - # on the results of the filter, so this filtering happens here instead - bulk.map! do |entry| - new_entry = begin - if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank? - { delete: entry[:index].except(:data) } - else - entry - end - end - - if new_entry[:index] - indexed += 1 - else - deleted += 1 - end - - new_entry - end - - Chewy::Index::Import::BulkRequest.new(index).perform(bulk) - - [indexed, deleted] - end - end - end - - wait! - end - - private - - def index - StatusesIndex - end - - def scopes - [ - local_statuses_scope, - local_mentions_scope, - local_favourites_scope, - local_votes_scope, - local_bookmarks_scope, - ] - end - - def local_mentions_scope - Mention.where(account: Account.local, silent: false).select(:id, :status_id) - end - - def local_favourites_scope - Favourite.where(account: Account.local).select(:id, :status_id) - end - - def local_bookmarks_scope - Bookmark.select(:id, :status_id) - end - - def local_votes_scope - Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id') - end - - def local_statuses_scope - Status.local.select('id, coalesce(reblog_of_id, id) as status_id') - end -end diff --git a/app/lib/importer/tags_index_importer.rb b/app/lib/importer/tags_index_importer.rb deleted file mode 100644 index f5bd8f052b..0000000000 --- a/app/lib/importer/tags_index_importer.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -class Importer::TagsIndexImporter < Importer::BaseImporter - def import! - index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp| - in_work_unit(tmp) do |tags| - bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body - - indexed = bulk.select { |entry| entry[:index] }.size - deleted = bulk.select { |entry| entry[:delete] }.size - - Chewy::Index::Import::BulkRequest.new(index).perform(bulk) - - [indexed, deleted] - end - end - - wait! - end - - private - - def index - TagsIndex - end -end diff --git a/app/models/trends/history.rb b/app/models/trends/history.rb index 74723e35c9..608e337924 100644 --- a/app/models/trends/history.rb +++ b/app/models/trends/history.rb @@ -11,11 +11,11 @@ class Trends::History end def uses - with_redis { |redis| redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum } + redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum end def accounts - with_redis { |redis| redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) } + redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) end end @@ -33,21 +33,19 @@ class Trends::History attr_reader :day def accounts - with_redis { |redis| redis.pfcount(key_for(:accounts)) } + redis.pfcount(key_for(:accounts)) end def uses - with_redis { |redis| redis.get(key_for(:uses))&.to_i || 0 } + redis.get(key_for(:uses))&.to_i || 0 end def add(account_id) - with_redis do |redis| - redis.pipelined do |pipeline| - pipeline.incrby(key_for(:uses), 1) - pipeline.pfadd(key_for(:accounts), account_id) - pipeline.expire(key_for(:uses), EXPIRE_AFTER) - pipeline.expire(key_for(:accounts), EXPIRE_AFTER) - end + redis.pipelined do + redis.incrby(key_for(:uses), 1) + redis.pfadd(key_for(:accounts), account_id) + redis.expire(key_for(:uses), EXPIRE_AFTER) + redis.expire(key_for(:accounts), EXPIRE_AFTER) end end diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb index b579ebc143..74f980ba11 100644 --- a/lib/mastodon/search_cli.rb +++ b/lib/mastodon/search_cli.rb @@ -16,21 +16,19 @@ module Mastodon StatusesIndex, ].freeze - option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads' - option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch' + option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads' + option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch' option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices' - option :import, type: :boolean, default: true, desc: 'Import data from the database to the index' - option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index' desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them' long_desc <<~LONG_DESC If Elasticsearch is empty, this command will create the necessary indices and then import data from the database into those indices. This command will also upgrade indices if the underlying schema has been - changed since the last run. Index upgrades erase index data. + changed since the last run. Even if creating or upgrading indices is not necessary, data from the - database will be imported into the indices, unless overriden with --no-import. + database will be imported into the indices. LONG_DESC def deploy if options[:concurrency] < 1 @@ -51,9 +49,7 @@ module Mastodon end end - pool = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10) - importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) } - progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) + progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) # First, ensure all indices are created and have the correct # structure, so that live data can already be written @@ -63,46 +59,99 @@ module Mastodon index.specification.lock! end - progress.title = 'Estimating workload ' - progress.total = indices.sum { |index| importers[index].estimate! } - reset_connection_pools! - added = 0 - removed = 0 + pool = Concurrent::FixedThreadPool.new(options[:concurrency]) + added = Concurrent::AtomicFixnum.new(0) + removed = Concurrent::AtomicFixnum.new(0) + progress.title = 'Estimating workload ' + + # Estimate the amount of data that has to be imported first + progress.total = indices.sum { |index| index.adapter.default_scope.count } + + # Now import all the actual data. Mind that unlike chewy:sync, we don't + # fetch and compare all record IDs from the database and the index to + # find out which to add and which to remove from the index. Because with + # potentially millions of rows, the memory footprint of such a calculation + # is uneconomical. So we only ever add. indices.each do |index| - importer = importers[index] - importer.optimize_for_import! + progress.title = "Importing #{index} " + batch_size = options[:batch_size] + slice_size = (batch_size / options[:concurrency]).ceil - importer.on_progress do |(indexed, deleted)| - progress.total = nil if progress.progress + indexed + deleted > progress.total - progress.progress += indexed + deleted - added += indexed - removed += deleted - end + index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch| + futures = [] - importer.on_failure do |reason| - progress.log(pastel.red("Error while importing #{index}: #{reason}")) - end + batch.each_slice(slice_size) do |records| + futures << Concurrent::Future.execute(executor: pool) do + begin + if !progress.total.nil? && progress.progress + records.size > progress.total + # The number of items has changed between start and now, + # since there is no good way to predict the final count from + # here, just change the progress bar to an indeterminate one - if options[:import] - progress.title = "Importing #{index} " - importer.import! - end + progress.total = nil + end - if options[:clean] - progress.title = "Cleaning #{index} " - importer.clean_up! + grouped_records = nil + bulk_body = nil + index_count = 0 + delete_count = 0 + + ActiveRecord::Base.connection_pool.with_connection do + grouped_records = records.to_a.group_by do |record| + index.adapter.send(:delete_from_index?, record) ? :delete : :to_index + end + + bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body + end + + index_count = grouped_records[:to_index].size if grouped_records.key?(:to_index) + delete_count = grouped_records[:delete].size if grouped_records.key?(:delete) + + # The following is an optimization for statuses specifically, since + # we want to de-index statuses that cannot be searched by anybody, + # but can't use Chewy's delete_if logic because it doesn't use + # crutches and our searchable_by logic depends on them + if index == StatusesIndex + bulk_body.map! do |entry| + if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank? + index_count -= 1 + delete_count += 1 + + { delete: entry[:to_index].except(:data) } + else + entry + end + end + end + + Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body) + + progress.progress += records.size + + added.increment(index_count) + removed.increment(delete_count) + + sleep 1 + rescue => e + progress.log pastel.red("Error importing #{index}: #{e}") + ensure + RedisConfiguration.pool.checkin if Thread.current[:redis] + Thread.current[:redis] = nil + end + end + end + + futures.map(&:value) end - ensure - importer.optimize_for_search! end - progress.title = 'Done! ' - progress.finish + progress.title = '' + progress.stop - say("Indexed #{added} records, de-indexed #{removed}", :green, true) + say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true) end end end