mirror of
https://git.bsd.gay/fef/nyastodon.git
synced 2025-01-25 04:54:06 +01:00
4ac78e2a06
* Add account statuses cleanup policy model * Record last inspected toot to delete to speed up successive calls to statuses_to_delete * Add service to cleanup a given account's statuses within a budget * Add worker to go through account policies and delete old toots * Fix last inspected status id logic All existing statuses older or equal to last inspected status id must be kept by the current policy. This is an invariant that must be kept so that resuming deletion from the last inspected status remains sound. * Add tests * Refactor scheduler and add tests * Add user interface * Add support for discriminating based on boosts/favs * Add UI support for min_reblogs and min_favs, rework UI * Address first round of review comments * Replace Snowflake#id_at_start with with_random parameter * Add tests * Add tests for StatusesCleanupController * Rework settings page * Adjust load-avoiding mechanisms * Please CodeClimate
96 lines
3.4 KiB
Ruby
96 lines
3.4 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class Scheduler::AccountsStatusesCleanupScheduler
|
|
include Sidekiq::Worker
|
|
|
|
# This limit is mostly to be nice to the fediverse at large and not
|
|
# generate too much traffic.
|
|
# This also helps limiting the running time of the scheduler itself.
|
|
MAX_BUDGET = 50
|
|
|
|
# This is an attempt to spread the load across instances, as various
|
|
# accounts are likely to have various followers.
|
|
PER_ACCOUNT_BUDGET = 5
|
|
|
|
# This is an attempt to limit the workload generated by status removal
|
|
# jobs to something the particular instance can handle.
|
|
PER_THREAD_BUDGET = 5
|
|
|
|
# Those avoid loading an instance that is already under load
|
|
MAX_DEFAULT_SIZE = 2
|
|
MAX_DEFAULT_LATENCY = 5
|
|
MAX_PUSH_SIZE = 5
|
|
MAX_PUSH_LATENCY = 10
|
|
# 'pull' queue has lower priority jobs, and it's unlikely that pushing
|
|
# deletes would cause much issues with this queue if it didn't cause issues
|
|
# with default and push. Yet, do not enqueue deletes if the instance is
|
|
# lagging behind too much.
|
|
MAX_PULL_SIZE = 500
|
|
MAX_PULL_LATENCY = 300
|
|
|
|
# This is less of an issue in general, but deleting old statuses is likely
|
|
# to cause delivery errors, and thus increase the number of jobs to be retried.
|
|
# This doesn't directly translate to load, but connection errors and a high
|
|
# number of dead instances may lead to this spiraling out of control if
|
|
# unchecked.
|
|
MAX_RETRY_SIZE = 50_000
|
|
|
|
sidekiq_options retry: 0, lock: :until_executed
|
|
|
|
def perform
|
|
return if under_load?
|
|
|
|
budget = compute_budget
|
|
first_policy_id = last_processed_id
|
|
|
|
loop do
|
|
num_processed_accounts = 0
|
|
|
|
scope = AccountStatusesCleanupPolicy.where(enabled: true)
|
|
scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
|
|
scope.find_each(order: :asc) do |policy|
|
|
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
|
|
num_processed_accounts += 1 unless num_deleted.zero?
|
|
budget -= num_deleted
|
|
if budget.zero?
|
|
save_last_processed_id(policy.id)
|
|
break
|
|
end
|
|
end
|
|
|
|
# The idea here is to loop through all policies at least once until the budget is exhausted
|
|
# and start back after the last processed account otherwise
|
|
break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
|
|
first_policy_id = nil
|
|
end
|
|
end
|
|
|
|
def compute_budget
|
|
threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
|
|
[PER_THREAD_BUDGET * threads, MAX_BUDGET].min
|
|
end
|
|
|
|
def under_load?
|
|
return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
|
|
queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
|
|
end
|
|
|
|
private
|
|
|
|
def queue_under_load?(name, max_size, max_latency)
|
|
queue = Sidekiq::Queue.new(name)
|
|
queue.size > max_size || queue.latency > max_latency
|
|
end
|
|
|
|
def last_processed_id
|
|
Redis.current.get('account_statuses_cleanup_scheduler:last_account_id')
|
|
end
|
|
|
|
def save_last_processed_id(id)
|
|
if id.nil?
|
|
Redis.current.del('account_statuses_cleanup_scheduler:last_account_id')
|
|
else
|
|
Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
|
|
end
|
|
end
|
|
end
|