diff --git a/app/models/notification.rb b/app/models/notification.rb index f1605f034..497c8371c 100644 --- a/app/models/notification.rb +++ b/app/models/notification.rb @@ -100,6 +100,8 @@ class Notification < ApplicationRecord validates :type, inclusion: { in: TYPES } + scope :filtered, -> { where(filtered: true) } + scope :unfiltered, -> { where(filtered: false) } scope :without_suspended, -> { joins(:from_account).merge(Account.without_suspended) } def type diff --git a/app/services/notifications_cleanup_service.rb b/app/services/notifications_cleanup_service.rb new file mode 100644 index 000000000..b1aa79058 --- /dev/null +++ b/app/services/notifications_cleanup_service.rb @@ -0,0 +1,103 @@ +# frozen_string_literal: true + +class NotificationsCleanupService + # This can be expensive, so instead of cleaning up everything + # at once, we limit the number of accounts per run and run + # this more often. + ACCOUNTS_PER_RUN = 5 + + # Unfiltered notifications do not need any additional + # processing and can be deleted via SQL. This means we + # can safely delete a large number in one run. + UNFILTERED_DELETES_PER_ACCOUNT = 100_000 + + # Filtered notifications need to update their associated + # notification request. So we need to call #destroy on them + # which means we can only delete a comparatively small number + # in one run. + FILTERED_DESTROYS_PER_ACCOUNT = 1_000 + + # Different types of notifications may have different + # policies of how much of them / how long to keep them around. + POLICY_BY_TYPE = { + default: { + keep_at_least: 20_000, + months_to_keep: 6, + }.freeze, + }.freeze + + def call(notification_type) + @notification_type = notification_type + + accounts_with_old_notifications = fetch_accounts_with_old_notifications + accounts_with_many_notifications = fetch_accounts_with_many_notifications + affected_accounts = accounts_with_old_notifications & accounts_with_many_notifications + + affected_accounts.take(ACCOUNTS_PER_RUN).each do |account_id| + base_query = construct_base_query(account_id) + + # Delete unfiltered notifications via SQL + base_query + .unfiltered + .limit(UNFILTERED_DELETES_PER_ACCOUNT) + .delete_all + + # Delete filtered notifications with '#destroy' to + # update notification requests. + base_query + .filtered + .limit(FILTERED_DESTROYS_PER_ACCOUNT) + .destroy_all + end + end + + private + + def policy + @policy ||= POLICY_BY_TYPE[@notification_type] || POLICY_BY_TYPE[:default] + end + + def fetch_accounts_with_old_notifications + Notification + .where(type: @notification_type) + .where(created_at: ...policy[:months_to_keep].months.ago) + .distinct + .pluck(:account_id) + end + + def fetch_accounts_with_many_notifications + Notification + .from( + Notification + .select('account_id, COUNT(*) AS total') + .where(type: @notification_type) + .group(:account_id) + .arel.as('totals') + ) + .where('totals.total > ?', policy[:keep_at_least]) + .pluck(:account_id) + end + + def find_min_created_at_to_keep(account_id) + Notification + .from( + Notification + .where(type: @notification_type) + .where(account_id: account_id) + .limit(policy[:keep_at_least]) + .order(created_at: :desc) + ) + .group(:account_id) + .minimum(:created_at)[account_id] + end + + def construct_base_query(account_id) + min_created_at_to_keep = find_min_created_at_to_keep(account_id) + + Notification + .where(account_id: account_id) + .where(type: @notification_type) + .where(notifications: { created_at: ...min_created_at_to_keep }) + .where(notifications: { created_at: ...policy[:months_to_keep].months.ago }) + end +end diff --git a/app/workers/concerns/low_priority_scheduler.rb b/app/workers/concerns/low_priority_scheduler.rb new file mode 100644 index 000000000..bc4885858 --- /dev/null +++ b/app/workers/concerns/low_priority_scheduler.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module LowPriorityScheduler + # These are latency limits on various queues above which a server is + # considered to be under load, causing the auto-deletion to be entirely + # skipped for that run. + LOAD_LATENCY_THRESHOLDS = { + default: 5, + push: 10, + # The `pull` queue has lower priority jobs, and it's unlikely that + # pushing deletes would cause much issues with this queue if it didn't + # cause issues with `default` and `push`. Yet, do not enqueue deletes + # if the instance is lagging behind too much. + pull: 5.minutes.to_i, + }.freeze + + def under_load? + LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) } + end + + private + + def queue_under_load?(name, max_latency) + Sidekiq::Queue.new(name).latency > max_latency + end +end diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb index a2ab31cc5..1d44d9e83 100644 --- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -3,6 +3,7 @@ class Scheduler::AccountsStatusesCleanupScheduler include Sidekiq::Worker include Redisable + include LowPriorityScheduler # This limit is mostly to be nice to the fediverse at large and not # generate too much traffic. @@ -19,19 +20,6 @@ class Scheduler::AccountsStatusesCleanupScheduler # jobs to something the particular server can handle. PER_THREAD_BUDGET = 5 - # These are latency limits on various queues above which a server is - # considered to be under load, causing the auto-deletion to be entirely - # skipped for that run. - LOAD_LATENCY_THRESHOLDS = { - default: 5, - push: 10, - # The `pull` queue has lower priority jobs, and it's unlikely that - # pushing deletes would cause much issues with this queue if it didn't - # cause issues with `default` and `push`. Yet, do not enqueue deletes - # if the instance is lagging behind too much. - pull: 5.minutes.to_i, - }.freeze - sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i def perform @@ -91,10 +79,6 @@ class Scheduler::AccountsStatusesCleanupScheduler [PER_THREAD_BUDGET * threads, MAX_BUDGET].min end - def under_load? - LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) } - end - private def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration) @@ -113,10 +97,6 @@ class Scheduler::AccountsStatusesCleanupScheduler end end - def queue_under_load?(name, max_latency) - Sidekiq::Queue.new(name).latency > max_latency - end - def last_processed_id redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i end diff --git a/app/workers/scheduler/notifications_cleanup_scheduler.rb b/app/workers/scheduler/notifications_cleanup_scheduler.rb new file mode 100644 index 000000000..c6bd030d7 --- /dev/null +++ b/app/workers/scheduler/notifications_cleanup_scheduler.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class Scheduler::NotificationsCleanupScheduler + include Sidekiq::Worker + include LowPriorityScheduler + + TYPES_TO_CLEAN_UP = Notification::TYPES + + sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i + + def perform + return if under_load? + + TYPES_TO_CLEAN_UP.each do |type| + NotificationsCleanupService.new.call(type) + end + end +end diff --git a/config/sidekiq.yml b/config/sidekiq.yml index 488c2f2ab..aff16fbd0 100644 --- a/config/sidekiq.yml +++ b/config/sidekiq.yml @@ -67,3 +67,7 @@ interval: 1 hour class: Scheduler::AutoCloseRegistrationsScheduler queue: scheduler + notifications_cleanup_scheduler: + interval: 4 hours + class: Scheduler::NotificationsCleanupScheduler + queue: scheduler