catstodon/app/services/activitypub/process_account_service.rb
Claire 9a19227f17
Fix some RedisLocks auto-releasing too fast (#16276)
* Fix Delete and Create-related locks expiring too fast

Fixes #16238

By default, RedisLock expires after 10 seconds, which may not be enough to
process statuses, especially when those have attached media files.

This commit extends those 10 seconds to 15 minutes, which should be plenty
enough to handle any status, while being short enough to not waste many
sidekiq job retries in the exceedingly rare case in which a sidekiq process
would crash when processing a `Create` or `Delete`.

* Fix other RedisLock autorelease durations

Fixes #15645

- things that only perform a few simple database queries (e.g. finding and
  saving a record) have been left unchanged, so they'll still use the default
  10s duration
- things that perform significantly more complex database queries have been
  changed to a 5 minutes timeout
- things that perform multiple HTTP queries have been changed to a 15 minutes
  timeout
2021-05-19 23:52:08 +02:00

345 lines
11 KiB
Ruby

# frozen_string_literal: true
class ActivityPub::ProcessAccountService < BaseService
include JsonLdHelper
include DomainControlHelper
# Should be called with confirmed valid JSON
# and WebFinger-resolved username and domain
def call(username, domain, json, options = {})
return if json['inbox'].blank? || unsupported_uri_scheme?(json['id']) || domain_not_allowed?(domain)
@options = options
@json = json
@uri = @json['id']
@username = username
@domain = domain
@collections = {}
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
@account = Account.remote.find_by(uri: @uri) if @options[:only_key]
@account ||= Account.find_remote(@username, @domain)
@old_public_key = @account&.public_key
@old_protocol = @account&.protocol
@suspension_changed = false
create_account if @account.nil?
update_account
process_tags
process_attachments
process_duplicate_accounts! if @options[:verified_webfinger]
else
raise Mastodon::RaceConditionError
end
end
return if @account.nil?
after_protocol_change! if protocol_changed?
after_key_change! if key_changed? && !@options[:signed_with_known_key]
clear_tombstones! if key_changed?
after_suspension_change! if suspension_changed?
unless @options[:only_key] || @account.suspended?
check_featured_collection! if @account.featured_collection_url.present?
check_links! unless @account.fields.empty?
end
@account
rescue Oj::ParseError
nil
end
private
def create_account
@account = Account.new
@account.protocol = :activitypub
@account.username = @username
@account.domain = @domain
@account.private_key = nil
@account.suspended_at = domain_block.created_at if auto_suspend?
@account.suspension_origin = :local if auto_suspend?
@account.silenced_at = domain_block.created_at if auto_silence?
@account.save
end
def update_account
@account.last_webfingered_at = Time.now.utc unless @options[:only_key]
@account.protocol = :activitypub
set_suspension!
set_immediate_protocol_attributes!
set_fetchable_key! unless @account.suspended? && @account.suspension_origin_local?
set_immediate_attributes! unless @account.suspended?
set_fetchable_attributes! unless @options[:only_key] || @account.suspended?
@account.save_with_optional_media!
end
def set_immediate_protocol_attributes!
@account.inbox_url = @json['inbox'] || ''
@account.outbox_url = @json['outbox'] || ''
@account.shared_inbox_url = (@json['endpoints'].is_a?(Hash) ? @json['endpoints']['sharedInbox'] : @json['sharedInbox']) || ''
@account.followers_url = @json['followers'] || ''
@account.url = url || @uri
@account.uri = @uri
@account.actor_type = actor_type
@account.created_at = @json['published'] if @json['published'].present?
end
def set_immediate_attributes!
@account.featured_collection_url = @json['featured'] || ''
@account.devices_url = @json['devices'] || ''
@account.display_name = @json['name'] || ''
@account.note = @json['summary'] || ''
@account.locked = @json['manuallyApprovesFollowers'] || false
@account.fields = property_values || {}
@account.also_known_as = as_array(@json['alsoKnownAs'] || []).map { |item| value_or_id(item) }
@account.discoverable = @json['discoverable'] || false
end
def set_fetchable_key!
@account.public_key = public_key || ''
end
def set_fetchable_attributes!
begin
@account.avatar_remote_url = image_url('icon') || '' unless skip_download?
rescue Mastodon::UnexpectedResponseError, HTTP::TimeoutError, HTTP::ConnectionError, OpenSSL::SSL::SSLError
RedownloadAvatarWorker.perform_in(rand(30..600).seconds, @account.id)
end
begin
@account.header_remote_url = image_url('image') || '' unless skip_download?
rescue Mastodon::UnexpectedResponseError, HTTP::TimeoutError, HTTP::ConnectionError, OpenSSL::SSL::SSLError
RedownloadHeaderWorker.perform_in(rand(30..600).seconds, @account.id)
end
@account.statuses_count = outbox_total_items if outbox_total_items.present?
@account.following_count = following_total_items if following_total_items.present?
@account.followers_count = followers_total_items if followers_total_items.present?
@account.hide_collections = following_private? || followers_private?
@account.moved_to_account = @json['movedTo'].present? ? moved_account : nil
end
def set_suspension!
return if @account.suspended? && @account.suspension_origin_local?
if @account.suspended? && !@json['suspended']
@account.unsuspend!
@suspension_changed = true
elsif !@account.suspended? && @json['suspended']
@account.suspend!(origin: :remote)
@suspension_changed = true
end
end
def after_protocol_change!
ActivityPub::PostUpgradeWorker.perform_async(@account.domain)
end
def after_key_change!
RefollowWorker.perform_async(@account.id)
end
def after_suspension_change!
if @account.suspended?
Admin::SuspensionWorker.perform_async(@account.id)
else
Admin::UnsuspensionWorker.perform_async(@account.id)
end
end
def check_featured_collection!
ActivityPub::SynchronizeFeaturedCollectionWorker.perform_async(@account.id)
end
def check_links!
VerifyAccountLinksWorker.perform_async(@account.id)
end
def process_duplicate_accounts!
return unless Account.where(uri: @account.uri).where.not(id: @account.id).exists?
AccountMergingWorker.perform_async(@account.id)
end
def actor_type
if @json['type'].is_a?(Array)
@json['type'].find { |type| ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES.include?(type) }
else
@json['type']
end
end
def image_url(key)
value = first_of_value(@json[key])
return if value.nil?
return value['url'] if value.is_a?(Hash)
image = fetch_resource_without_id_validation(value)
image['url'] if image
end
def public_key
value = first_of_value(@json['publicKey'])
return if value.nil?
return value['publicKeyPem'] if value.is_a?(Hash)
key = fetch_resource_without_id_validation(value)
key['publicKeyPem'] if key
end
def url
return if @json['url'].blank?
url_candidate = url_to_href(@json['url'], 'text/html')
if unsupported_uri_scheme?(url_candidate) || mismatching_origin?(url_candidate)
nil
else
url_candidate
end
end
def property_values
return unless @json['attachment'].is_a?(Array)
as_array(@json['attachment']).select { |attachment| attachment['type'] == 'PropertyValue' }.map { |attachment| attachment.slice('name', 'value') }
end
def mismatching_origin?(url)
needle = Addressable::URI.parse(url).host
haystack = Addressable::URI.parse(@uri).host
!haystack.casecmp(needle).zero?
end
def outbox_total_items
collection_info('outbox').first
end
def following_total_items
collection_info('following').first
end
def followers_total_items
collection_info('followers').first
end
def following_private?
!collection_info('following').last
end
def followers_private?
!collection_info('followers').last
end
def collection_info(type)
return [nil, nil] if @json[type].blank?
return @collections[type] if @collections.key?(type)
collection = fetch_resource_without_id_validation(@json[type])
total_items = collection.is_a?(Hash) && collection['totalItems'].present? && collection['totalItems'].is_a?(Numeric) ? collection['totalItems'] : nil
has_first_page = collection.is_a?(Hash) && collection['first'].present?
@collections[type] = [total_items, has_first_page]
rescue HTTP::Error, OpenSSL::SSL::SSLError, Mastodon::LengthValidationError
@collections[type] = [nil, nil]
end
def moved_account
account = ActivityPub::TagManager.instance.uri_to_resource(@json['movedTo'], Account)
account ||= ActivityPub::FetchRemoteAccountService.new.call(@json['movedTo'], id: true, break_on_redirect: true)
account
end
def skip_download?
@account.suspended? || domain_block&.reject_media?
end
def auto_suspend?
domain_block&.suspend?
end
def auto_silence?
domain_block&.silence?
end
def domain_block
return @domain_block if defined?(@domain_block)
@domain_block = DomainBlock.rule_for(@domain)
end
def key_changed?
!@old_public_key.nil? && @old_public_key != @account.public_key
end
def suspension_changed?
@suspension_changed
end
def clear_tombstones!
Tombstone.where(account_id: @account.id).delete_all
end
def protocol_changed?
!@old_protocol.nil? && @old_protocol != @account.protocol
end
def lock_options
{ redis: Redis.current, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds }
end
def process_tags
return if @json['tag'].blank?
as_array(@json['tag']).each do |tag|
process_emoji tag if equals_or_includes?(tag['type'], 'Emoji')
end
end
def process_attachments
return if @json['attachment'].blank?
previous_proofs = @account.identity_proofs.to_a
current_proofs = []
as_array(@json['attachment']).each do |attachment|
next unless equals_or_includes?(attachment['type'], 'IdentityProof')
current_proofs << process_identity_proof(attachment)
end
previous_proofs.each do |previous_proof|
next if current_proofs.any? { |current_proof| current_proof.id == previous_proof.id }
previous_proof.delete
end
end
def process_emoji(tag)
return if skip_download?
return if tag['name'].blank? || tag['icon'].blank? || tag['icon']['url'].blank?
shortcode = tag['name'].delete(':')
image_url = tag['icon']['url']
uri = tag['id']
updated = tag['updated']
emoji = CustomEmoji.find_by(shortcode: shortcode, domain: @account.domain)
return unless emoji.nil? || image_url != emoji.image_remote_url || (updated && updated >= emoji.updated_at)
emoji ||= CustomEmoji.new(domain: @account.domain, shortcode: shortcode, uri: uri)
emoji.image_remote_url = image_url
emoji.save
end
def process_identity_proof(attachment)
provider = attachment['signatureAlgorithm']
provider_username = attachment['name']
token = attachment['signatureValue']
@account.identity_proofs.where(provider: provider, provider_username: provider_username).find_or_create_by(provider: provider, provider_username: provider_username, token: token)
end
end