03d59340da
* Fix Sidekiq warnings about JSON serialization This occurs on every symbol argument we pass, and every symbol key in hashes, because Sidekiq expects strings instead. See https://github.com/mperham/sidekiq/pull/5071 We do not need to change how workers parse their arguments because this has not changed and we were already converting to symbols adequately or using `with_indifferent_access`. * Set Sidekiq to raise on unsafe arguments in test mode In order to more easily catch issues that would produce warnings in production code.
129 lines
4.0 KiB
Ruby
129 lines
4.0 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class FanOutOnWriteService < BaseService
|
|
# Push a status into home and mentions feeds
|
|
# @param [Status] status
|
|
# @param [Hash] options
|
|
# @option options [Boolean] update
|
|
# @option options [Array<Integer>] silenced_account_ids
|
|
def call(status, options = {})
|
|
@status = status
|
|
@account = status.account
|
|
@options = options
|
|
|
|
check_race_condition!
|
|
|
|
fan_out_to_local_recipients!
|
|
fan_out_to_public_streams! if broadcastable?
|
|
end
|
|
|
|
private
|
|
|
|
def check_race_condition!
|
|
# I don't know why but at some point we had an issue where
|
|
# this service was being executed with status objects
|
|
# that had a null visibility - which should not be possible
|
|
# since the column in the database is not nullable.
|
|
#
|
|
# This check re-queues the service to be run at a later time
|
|
# with the full object, if something like it occurs
|
|
|
|
raise Mastodon::RaceConditionError if @status.visibility.nil?
|
|
end
|
|
|
|
def fan_out_to_local_recipients!
|
|
deliver_to_self!
|
|
notify_mentioned_accounts!
|
|
|
|
case @status.visibility.to_sym
|
|
when :public, :unlisted, :private
|
|
deliver_to_all_followers!
|
|
deliver_to_lists!
|
|
when :limited
|
|
deliver_to_mentioned_followers!
|
|
else
|
|
deliver_to_mentioned_followers!
|
|
deliver_to_conversation!
|
|
end
|
|
end
|
|
|
|
def fan_out_to_public_streams!
|
|
broadcast_to_hashtag_streams!
|
|
broadcast_to_public_streams!
|
|
end
|
|
|
|
def deliver_to_self!
|
|
FeedManager.instance.push_to_home(@account, @status, update: update?) if @account.local?
|
|
end
|
|
|
|
def notify_mentioned_accounts!
|
|
@status.active_mentions.where.not(id: @options[:silenced_account_ids] || []).joins(:account).merge(Account.local).select(:id, :account_id).reorder(nil).find_in_batches do |mentions|
|
|
LocalNotificationWorker.push_bulk(mentions) do |mention|
|
|
[mention.account_id, mention.id, 'Mention', 'mention']
|
|
end
|
|
end
|
|
end
|
|
|
|
def deliver_to_all_followers!
|
|
@account.followers_for_local_distribution.select(:id).reorder(nil).find_in_batches do |followers|
|
|
FeedInsertWorker.push_bulk(followers) do |follower|
|
|
[@status.id, follower.id, 'home', { 'update' => update? }]
|
|
end
|
|
end
|
|
end
|
|
|
|
def deliver_to_lists!
|
|
@account.lists_for_local_distribution.select(:id).reorder(nil).find_in_batches do |lists|
|
|
FeedInsertWorker.push_bulk(lists) do |list|
|
|
[@status.id, list.id, 'list', { 'update' => update? }]
|
|
end
|
|
end
|
|
end
|
|
|
|
def deliver_to_mentioned_followers!
|
|
@status.mentions.joins(:account).merge(@account.followers_for_local_distribution).select(:id, :account_id).reorder(nil).find_in_batches do |mentions|
|
|
FeedInsertWorker.push_bulk(mentions) do |mention|
|
|
[@status.id, mention.account_id, 'home', { 'update' => update? }]
|
|
end
|
|
end
|
|
end
|
|
|
|
def broadcast_to_hashtag_streams!
|
|
@status.tags.pluck(:name).each do |hashtag|
|
|
Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload)
|
|
Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local?
|
|
end
|
|
end
|
|
|
|
def broadcast_to_public_streams!
|
|
return if @status.reply? && @status.in_reply_to_account_id != @account.id
|
|
|
|
Redis.current.publish('timeline:public', anonymous_payload)
|
|
Redis.current.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload)
|
|
|
|
if @status.media_attachments.any?
|
|
Redis.current.publish('timeline:public:media', anonymous_payload)
|
|
Redis.current.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload)
|
|
end
|
|
end
|
|
|
|
def deliver_to_conversation!
|
|
AccountConversation.add_status(@account, @status) unless update?
|
|
end
|
|
|
|
def anonymous_payload
|
|
@anonymous_payload ||= Oj.dump(
|
|
event: update? ? :'status.update' : :update,
|
|
payload: InlineRenderer.render(@status, nil, :status)
|
|
)
|
|
end
|
|
|
|
def update?
|
|
@options[:update]
|
|
end
|
|
|
|
def broadcastable?
|
|
@status.public_visibility? && !@status.reblog? && !@account.silenced?
|
|
end
|
|
end
|