Remove Salmon and PubSubHubbub (#11205)
* Remove Salmon and PubSubHubbub endpoints * Add error when trying to follow OStatus accounts * Fix new accounts not being created in ResolveAccountService
This commit is contained in:
parent
c07cca4727
commit
23aeef52cc
102 changed files with 70 additions and 3569 deletions
|
@ -5,27 +5,5 @@ class AfterRemoteFollowRequestWorker
|
|||
|
||||
sidekiq_options queue: 'pull', retry: 5
|
||||
|
||||
attr_reader :follow_request
|
||||
|
||||
def perform(follow_request_id)
|
||||
@follow_request = FollowRequest.find(follow_request_id)
|
||||
process_follow_service if processing_required?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_follow_service
|
||||
follow_request.destroy
|
||||
FollowService.new.call(follow_request.account, updated_account.acct)
|
||||
end
|
||||
|
||||
def processing_required?
|
||||
!updated_account.nil? && !updated_account.locked?
|
||||
end
|
||||
|
||||
def updated_account
|
||||
@_updated_account ||= FetchRemoteAccountService.new.call(follow_request.target_account.remote_url)
|
||||
end
|
||||
def perform(follow_request_id); end
|
||||
end
|
||||
|
|
|
@ -5,27 +5,5 @@ class AfterRemoteFollowWorker
|
|||
|
||||
sidekiq_options queue: 'pull', retry: 5
|
||||
|
||||
attr_reader :follow
|
||||
|
||||
def perform(follow_id)
|
||||
@follow = Follow.find(follow_id)
|
||||
process_follow_service if processing_required?
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_follow_service
|
||||
follow.destroy
|
||||
FollowService.new.call(follow.account, updated_account.acct)
|
||||
end
|
||||
|
||||
def updated_account
|
||||
@_updated_account ||= FetchRemoteAccountService.new.call(follow.target_account.remote_url)
|
||||
end
|
||||
|
||||
def processing_required?
|
||||
!updated_account.nil? && updated_account.locked?
|
||||
end
|
||||
def perform(follow_id); end
|
||||
end
|
||||
|
|
|
@ -5,7 +5,5 @@ class NotificationWorker
|
|||
|
||||
sidekiq_options queue: 'push', retry: 5
|
||||
|
||||
def perform(xml, source_account_id, target_account_id)
|
||||
SendInteractionService.new.call(xml, Account.find(source_account_id), Account.find(target_account_id))
|
||||
end
|
||||
def perform(xml, source_account_id, target_account_id); end
|
||||
end
|
||||
|
|
|
@ -5,7 +5,5 @@ class ProcessingWorker
|
|||
|
||||
sidekiq_options backtrace: true
|
||||
|
||||
def perform(account_id, body)
|
||||
ProcessFeedService.new.call(body, Account.find(account_id), override_timestamps: true)
|
||||
end
|
||||
def perform(account_id, body); end
|
||||
end
|
||||
|
|
|
@ -2,81 +2,8 @@
|
|||
|
||||
class Pubsubhubbub::ConfirmationWorker
|
||||
include Sidekiq::Worker
|
||||
include RoutingHelper
|
||||
|
||||
sidekiq_options queue: 'push', retry: false
|
||||
|
||||
attr_reader :subscription, :mode, :secret, :lease_seconds
|
||||
|
||||
def perform(subscription_id, mode, secret = nil, lease_seconds = nil)
|
||||
@subscription = Subscription.find(subscription_id)
|
||||
@mode = mode
|
||||
@secret = secret
|
||||
@lease_seconds = lease_seconds
|
||||
process_confirmation
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_confirmation
|
||||
prepare_subscription
|
||||
|
||||
callback_get_with_params
|
||||
logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{@callback_response_body}"
|
||||
|
||||
update_subscription
|
||||
end
|
||||
|
||||
def update_subscription
|
||||
if successful_subscribe?
|
||||
subscription.save!
|
||||
elsif successful_unsubscribe?
|
||||
subscription.destroy!
|
||||
end
|
||||
end
|
||||
|
||||
def successful_subscribe?
|
||||
subscribing? && response_matches_challenge?
|
||||
end
|
||||
|
||||
def successful_unsubscribe?
|
||||
(unsubscribing? && response_matches_challenge?) || !subscription.confirmed?
|
||||
end
|
||||
|
||||
def response_matches_challenge?
|
||||
@callback_response_body == challenge
|
||||
end
|
||||
|
||||
def subscribing?
|
||||
mode == 'subscribe'
|
||||
end
|
||||
|
||||
def unsubscribing?
|
||||
mode == 'unsubscribe'
|
||||
end
|
||||
|
||||
def callback_get_with_params
|
||||
Request.new(:get, subscription.callback_url, params: callback_params).perform do |response|
|
||||
@callback_response_body = response.body_with_limit
|
||||
end
|
||||
end
|
||||
|
||||
def callback_params
|
||||
{
|
||||
'hub.topic': account_url(subscription.account, format: :atom),
|
||||
'hub.mode': mode,
|
||||
'hub.challenge': challenge,
|
||||
'hub.lease_seconds': subscription.lease_seconds,
|
||||
}
|
||||
end
|
||||
|
||||
def prepare_subscription
|
||||
subscription.secret = secret
|
||||
subscription.lease_seconds = lease_seconds
|
||||
subscription.confirmed = true
|
||||
end
|
||||
|
||||
def challenge
|
||||
@_challenge ||= SecureRandom.hex
|
||||
end
|
||||
def perform(subscription_id, mode, secret = nil, lease_seconds = nil); end
|
||||
end
|
||||
|
|
|
@ -2,80 +2,8 @@
|
|||
|
||||
class Pubsubhubbub::DeliveryWorker
|
||||
include Sidekiq::Worker
|
||||
include RoutingHelper
|
||||
|
||||
sidekiq_options queue: 'push', retry: 3, dead: false
|
||||
|
||||
sidekiq_retry_in do |count|
|
||||
5 * (count + 1)
|
||||
end
|
||||
|
||||
attr_reader :subscription, :payload
|
||||
|
||||
def perform(subscription_id, payload)
|
||||
@subscription = Subscription.find(subscription_id)
|
||||
@payload = payload
|
||||
process_delivery unless blocked_domain?
|
||||
rescue => e
|
||||
raise e.class, "Delivery failed for #{subscription&.callback_url}: #{e.message}", e.backtrace[0]
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_delivery
|
||||
callback_post_payload do |payload_delivery|
|
||||
raise Mastodon::UnexpectedResponseError, payload_delivery unless response_successful? payload_delivery
|
||||
end
|
||||
|
||||
subscription.touch(:last_successful_delivery_at)
|
||||
end
|
||||
|
||||
def callback_post_payload(&block)
|
||||
request = Request.new(:post, subscription.callback_url, body: payload)
|
||||
request.add_headers(headers)
|
||||
request.perform(&block)
|
||||
end
|
||||
|
||||
def blocked_domain?
|
||||
DomainBlock.blocked?(host)
|
||||
end
|
||||
|
||||
def host
|
||||
Addressable::URI.parse(subscription.callback_url).normalized_host
|
||||
end
|
||||
|
||||
def headers
|
||||
{
|
||||
'Content-Type' => 'application/atom+xml',
|
||||
'Link' => link_header,
|
||||
}.merge(signature_headers.to_h)
|
||||
end
|
||||
|
||||
def link_header
|
||||
LinkHeader.new([hub_link_header, self_link_header]).to_s
|
||||
end
|
||||
|
||||
def hub_link_header
|
||||
[api_push_url, [%w(rel hub)]]
|
||||
end
|
||||
|
||||
def self_link_header
|
||||
[account_url(subscription.account, format: :atom), [%w(rel self)]]
|
||||
end
|
||||
|
||||
def signature_headers
|
||||
{ 'X-Hub-Signature' => payload_signature } if subscription.secret?
|
||||
end
|
||||
|
||||
def payload_signature
|
||||
"sha1=#{hmac_payload_digest}"
|
||||
end
|
||||
|
||||
def hmac_payload_digest
|
||||
OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret, payload)
|
||||
end
|
||||
|
||||
def response_successful?(payload_delivery)
|
||||
payload_delivery.code > 199 && payload_delivery.code < 300
|
||||
end
|
||||
def perform(subscription_id, payload); end
|
||||
end
|
||||
|
|
|
@ -5,28 +5,5 @@ class Pubsubhubbub::DistributionWorker
|
|||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(stream_entry_ids)
|
||||
stream_entries = StreamEntry.where(id: stream_entry_ids).includes(:status).reject { |e| e.status.nil? || e.status.hidden? }
|
||||
|
||||
return if stream_entries.empty?
|
||||
|
||||
@account = stream_entries.first.account
|
||||
@subscriptions = active_subscriptions.to_a
|
||||
|
||||
distribute_public!(stream_entries)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def distribute_public!(stream_entries)
|
||||
@payload = OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.feed(@account, stream_entries))
|
||||
|
||||
Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription_id|
|
||||
[subscription_id, @payload]
|
||||
end
|
||||
end
|
||||
|
||||
def active_subscriptions
|
||||
Subscription.where(account: @account).active.pluck(:id)
|
||||
end
|
||||
def perform(stream_entry_ids); end
|
||||
end
|
||||
|
|
|
@ -5,18 +5,5 @@ class Pubsubhubbub::RawDistributionWorker
|
|||
|
||||
sidekiq_options queue: 'push'
|
||||
|
||||
def perform(xml, source_account_id)
|
||||
@account = Account.find(source_account_id)
|
||||
@subscriptions = active_subscriptions.to_a
|
||||
|
||||
Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription|
|
||||
[subscription.id, xml]
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def active_subscriptions
|
||||
Subscription.where(account: @account).active.select('id, callback_url, domain')
|
||||
end
|
||||
def perform(xml, source_account_id); end
|
||||
end
|
||||
|
|
|
@ -5,30 +5,5 @@ class Pubsubhubbub::SubscribeWorker
|
|||
|
||||
sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false
|
||||
|
||||
sidekiq_retry_in do |count|
|
||||
case count
|
||||
when 0
|
||||
30.minutes.seconds
|
||||
when 1
|
||||
2.hours.seconds
|
||||
when 2
|
||||
12.hours.seconds
|
||||
else
|
||||
24.hours.seconds * (count - 2)
|
||||
end
|
||||
end
|
||||
|
||||
sidekiq_retries_exhausted do |msg, _e|
|
||||
account = Account.find(msg['args'].first)
|
||||
Sidekiq.logger.error "PuSH subscription attempts for #{account.acct} exhausted. Unsubscribing"
|
||||
::UnsubscribeService.new.call(account)
|
||||
end
|
||||
|
||||
def perform(account_id)
|
||||
account = Account.find(account_id)
|
||||
logger.debug "PuSH re-subscribing to #{account.acct}"
|
||||
::SubscribeService.new.call(account)
|
||||
rescue => e
|
||||
raise e.class, "Subscribe failed for #{account&.acct}: #{e.message}", e.backtrace[0]
|
||||
end
|
||||
def perform(account_id); end
|
||||
end
|
||||
|
|
|
@ -5,11 +5,5 @@ class Pubsubhubbub::UnsubscribeWorker
|
|||
|
||||
sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false
|
||||
|
||||
def perform(account_id)
|
||||
account = Account.find(account_id)
|
||||
logger.debug "PuSH unsubscribing from #{account.acct}"
|
||||
::UnsubscribeService.new.call(account)
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
def perform(account_id); end
|
||||
end
|
||||
|
|
|
@ -5,9 +5,5 @@ class RemoteProfileUpdateWorker
|
|||
|
||||
sidekiq_options queue: 'pull'
|
||||
|
||||
def perform(account_id, body, resubscribe)
|
||||
UpdateRemoteProfileService.new.call(body, Account.find(account_id), resubscribe)
|
||||
rescue ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
def perform(account_id, body, resubscribe); end
|
||||
end
|
||||
|
|
|
@ -5,9 +5,5 @@ class SalmonWorker
|
|||
|
||||
sidekiq_options backtrace: true
|
||||
|
||||
def perform(account_id, body)
|
||||
ProcessInteractionService.new.call(body, Account.find(account_id))
|
||||
rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound
|
||||
true
|
||||
end
|
||||
def perform(account_id, body); end
|
||||
end
|
||||
|
|
|
@ -5,13 +5,5 @@ class Scheduler::SubscriptionsScheduler
|
|||
|
||||
sidekiq_options unique: :until_executed, retry: 0
|
||||
|
||||
def perform
|
||||
Pubsubhubbub::SubscribeWorker.push_bulk(expiring_accounts.pluck(:id))
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def expiring_accounts
|
||||
Account.expiring(1.day.from_now).partitioned
|
||||
end
|
||||
def perform; end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue