diff --git a/app/controllers/api/v1/notifications/requests_controller.rb b/app/controllers/api/v1/notifications/requests_controller.rb index 0710166d05..b820a2ed2e 100644 --- a/app/controllers/api/v1/notifications/requests_controller.rb +++ b/app/controllers/api/v1/notifications/requests_controller.rb @@ -1,8 +1,10 @@ # frozen_string_literal: true class Api::V1::Notifications::RequestsController < Api::BaseController - before_action -> { doorkeeper_authorize! :read, :'read:notifications' }, only: :index - before_action -> { doorkeeper_authorize! :write, :'write:notifications' }, except: :index + include Redisable + + before_action -> { doorkeeper_authorize! :read, :'read:notifications' }, only: [:index, :show, :merged?] + before_action -> { doorkeeper_authorize! :write, :'write:notifications' }, except: [:index, :show, :merged?] before_action :require_user! before_action :set_request, only: [:show, :accept, :dismiss] @@ -19,6 +21,10 @@ class Api::V1::Notifications::RequestsController < Api::BaseController render json: @requests, each_serializer: REST::NotificationRequestSerializer, relationships: @relationships end + def merged? + render json: { merged: redis.get("notification_unfilter_jobs:#{current_account.id}").to_i <= 0 } + end + def show render json: @request, serializer: REST::NotificationRequestSerializer end diff --git a/app/javascript/mastodon/actions/notification_groups.ts b/app/javascript/mastodon/actions/notification_groups.ts index fd6a48e9f3..6b699706e2 100644 --- a/app/javascript/mastodon/actions/notification_groups.ts +++ b/app/javascript/mastodon/actions/notification_groups.ts @@ -138,8 +138,18 @@ export const processNewNotificationForGroups = createAppAsyncThunk( export const loadPending = createAction('notificationGroups/loadPending'); -export const updateScrollPosition = createAction<{ top: boolean }>( +export const updateScrollPosition = createAppAsyncThunk( 'notificationGroups/updateScrollPosition', + ({ top }: { top: boolean }, { dispatch, getState }) => { + if ( + top && + getState().notificationGroups.mergedNotifications === 'needs-reload' + ) { + void dispatch(fetchNotifications()); + } + + return { top }; + }, ); export const setNotificationsFilter = createAppAsyncThunk( @@ -165,5 +175,34 @@ export const markNotificationsAsRead = createAction( 'notificationGroups/markAsRead', ); -export const mountNotifications = createAction('notificationGroups/mount'); +export const mountNotifications = createAppAsyncThunk( + 'notificationGroups/mount', + (_, { dispatch, getState }) => { + const state = getState(); + + if ( + state.notificationGroups.mounted === 0 && + state.notificationGroups.mergedNotifications === 'needs-reload' + ) { + void dispatch(fetchNotifications()); + } + }, +); + export const unmountNotifications = createAction('notificationGroups/unmount'); + +export const refreshStaleNotificationGroups = createAppAsyncThunk<{ + deferredRefresh: boolean; +}>('notificationGroups/refreshStale', (_, { dispatch, getState }) => { + const state = getState(); + + if ( + state.notificationGroups.scrolledToTop || + !state.notificationGroups.mounted + ) { + void dispatch(fetchNotifications()); + return { deferredRefresh: false }; + } + + return { deferredRefresh: true }; +}); diff --git a/app/javascript/mastodon/actions/streaming.js b/app/javascript/mastodon/actions/streaming.js index 630da3ef6e..e082aea43d 100644 --- a/app/javascript/mastodon/actions/streaming.js +++ b/app/javascript/mastodon/actions/streaming.js @@ -10,7 +10,7 @@ import { deleteAnnouncement, } from './announcements'; import { updateConversations } from './conversations'; -import { processNewNotificationForGroups } from './notification_groups'; +import { processNewNotificationForGroups, refreshStaleNotificationGroups } from './notification_groups'; import { updateNotifications, expandNotifications } from './notifications'; import { updateStatus } from './statuses'; import { @@ -108,6 +108,14 @@ export const connectTimelineStream = (timelineId, channelName, params = {}, opti } break; } + case 'notifications_merged': + const state = getState(); + if (state.notifications.top || !state.notifications.mounted) + dispatch(expandNotifications({ forceLoad: true, maxId: undefined })); + if(state.settings.getIn(['notifications', 'groupingBeta'], false)) { + dispatch(refreshStaleNotificationGroups()); + } + break; case 'conversation': // @ts-expect-error dispatch(updateConversations(JSON.parse(data.payload))); diff --git a/app/javascript/mastodon/features/notifications_v2/index.tsx b/app/javascript/mastodon/features/notifications_v2/index.tsx index 63e602bdcc..29c49e05c8 100644 --- a/app/javascript/mastodon/features/notifications_v2/index.tsx +++ b/app/javascript/mastodon/features/notifications_v2/index.tsx @@ -81,7 +81,11 @@ export const Notifications: React.FC<{ const anyPendingNotification = useAppSelector(selectAnyPendingNotification); - const isUnread = unreadNotificationsCount > 0; + const needsReload = useAppSelector( + (state) => state.notificationGroups.mergedNotifications === 'needs-reload', + ); + + const isUnread = unreadNotificationsCount > 0 || needsReload; const canMarkAsRead = useAppSelector(selectSettingsNotificationsShowUnread) && @@ -118,11 +122,11 @@ export const Notifications: React.FC<{ // Keep track of mounted components for unread notification handling useEffect(() => { - dispatch(mountNotifications()); + void dispatch(mountNotifications()); return () => { dispatch(unmountNotifications()); - dispatch(updateScrollPosition({ top: false })); + void dispatch(updateScrollPosition({ top: false })); }; }, [dispatch]); @@ -147,11 +151,11 @@ export const Notifications: React.FC<{ }, [dispatch]); const handleScrollToTop = useDebouncedCallback(() => { - dispatch(updateScrollPosition({ top: true })); + void dispatch(updateScrollPosition({ top: true })); }, 100); const handleScroll = useDebouncedCallback(() => { - dispatch(updateScrollPosition({ top: false })); + void dispatch(updateScrollPosition({ top: false })); }, 100); useEffect(() => { diff --git a/app/javascript/mastodon/reducers/notification_groups.ts b/app/javascript/mastodon/reducers/notification_groups.ts index 0d3f34d2fb..0348f080ff 100644 --- a/app/javascript/mastodon/reducers/notification_groups.ts +++ b/app/javascript/mastodon/reducers/notification_groups.ts @@ -19,6 +19,7 @@ import { markNotificationsAsRead, mountNotifications, unmountNotifications, + refreshStaleNotificationGroups, } from 'mastodon/actions/notification_groups'; import { disconnectTimeline, @@ -51,6 +52,7 @@ interface NotificationGroupsState { readMarkerId: string; mounted: number; isTabVisible: boolean; + mergedNotifications: 'ok' | 'pending' | 'needs-reload'; } const initialState: NotificationGroupsState = { @@ -58,6 +60,8 @@ const initialState: NotificationGroupsState = { pendingGroups: [], // holds pending groups in slow mode scrolledToTop: false, isLoading: false, + // this is used to track whether we need to refresh notifications after accepting requests + mergedNotifications: 'ok', // The following properties are used to track unread notifications lastReadId: '0', // used internally for unread notifications readMarkerId: '0', // user-facing and updated when focus changes @@ -301,6 +305,7 @@ export const notificationGroupsReducer = createReducer( json.type === 'gap' ? json : createNotificationGroupFromJSON(json), ); state.isLoading = false; + state.mergedNotifications = 'ok'; updateLastReadId(state); }) .addCase(fetchNotificationsGap.fulfilled, (state, action) => { @@ -455,7 +460,7 @@ export const notificationGroupsReducer = createReducer( state.groups = state.pendingGroups.concat(state.groups); state.pendingGroups = []; }) - .addCase(updateScrollPosition, (state, action) => { + .addCase(updateScrollPosition.fulfilled, (state, action) => { state.scrolledToTop = action.payload.top; updateLastReadId(state); trimNotifications(state); @@ -482,7 +487,7 @@ export const notificationGroupsReducer = createReducer( action.payload.markers.notifications.last_read_id; } }) - .addCase(mountNotifications, (state) => { + .addCase(mountNotifications.fulfilled, (state) => { state.mounted += 1; commitLastReadId(state); updateLastReadId(state); @@ -498,6 +503,10 @@ export const notificationGroupsReducer = createReducer( .addCase(unfocusApp, (state) => { state.isTabVisible = false; }) + .addCase(refreshStaleNotificationGroups.fulfilled, (state, action) => { + if (action.payload.deferredRefresh) + state.mergedNotifications = 'needs-reload'; + }) .addMatcher( isAnyOf(authorizeFollowRequestSuccess, rejectFollowRequestSuccess), (state, action) => { diff --git a/app/services/accept_notification_request_service.rb b/app/services/accept_notification_request_service.rb index ad27ae3300..60ec6bb3b6 100644 --- a/app/services/accept_notification_request_service.rb +++ b/app/services/accept_notification_request_service.rb @@ -1,9 +1,21 @@ # frozen_string_literal: true class AcceptNotificationRequestService < BaseService + include Redisable + def call(request) NotificationPermission.create!(account: request.account, from_account: request.from_account) + increment_worker_count!(request) UnfilterNotificationsWorker.perform_async(request.account_id, request.from_account_id) request.destroy! end + + private + + def increment_worker_count!(request) + with_redis do |redis| + redis.incr("notification_unfilter_jobs:#{request.account_id}") + redis.expire("notification_unfilter_jobs:#{request.account_id}", 30.minutes.to_i) + end + end end diff --git a/app/workers/unfilter_notifications_worker.rb b/app/workers/unfilter_notifications_worker.rb index 4351758907..53a35ce12c 100644 --- a/app/workers/unfilter_notifications_worker.rb +++ b/app/workers/unfilter_notifications_worker.rb @@ -2,6 +2,7 @@ class UnfilterNotificationsWorker include Sidekiq::Worker + include Redisable # Earlier versions of the feature passed a `notification_request` ID # If `to_account_id` is passed, the first argument is an account ID @@ -9,19 +10,20 @@ class UnfilterNotificationsWorker def perform(notification_request_or_account_id, from_account_id = nil) if from_account_id.present? @notification_request = nil - @from_account = Account.find(from_account_id) - @recipient = Account.find(notification_request_or_account_id) + @from_account = Account.find_by(id: from_account_id) + @recipient = Account.find_by(id: notification_request_or_account_id) else - @notification_request = NotificationRequest.find(notification_request_or_account_id) - @from_account = @notification_request.from_account - @recipient = @notification_request.account + @notification_request = NotificationRequest.find_by(id: notification_request_or_account_id) + @from_account = @notification_request&.from_account + @recipient = @notification_request&.account end + return if @from_account.nil? || @recipient.nil? + push_to_conversations! unfilter_notifications! remove_request! - rescue ActiveRecord::RecordNotFound - true + decrement_worker_count! end private @@ -45,4 +47,17 @@ class UnfilterNotificationsWorker def notifications_with_private_mentions filtered_notifications.where(type: :mention).joins(mention: :status).merge(Status.where(visibility: :direct)).includes(mention: :status) end + + def decrement_worker_count! + value = redis.decr("notification_unfilter_jobs:#{@recipient.id}") + push_streaming_event! if value <= 0 && subscribed_to_streaming_api? + end + + def push_streaming_event! + redis.publish("timeline:#{@recipient.id}:notifications", Oj.dump(event: :notifications_merged, payload: '1')) + end + + def subscribed_to_streaming_api? + redis.exists?("subscribed:timeline:#{@recipient.id}") || redis.exists?("subscribed:timeline:#{@recipient.id}:notifications") + end end diff --git a/config/routes/api.rb b/config/routes/api.rb index 488bdb7453..c5addd3385 100644 --- a/config/routes/api.rb +++ b/config/routes/api.rb @@ -158,6 +158,7 @@ namespace :api, format: false do collection do post :accept, to: 'requests#accept_bulk' post :dismiss, to: 'requests#dismiss_bulk' + get :merged, to: 'requests#merged?' end member do diff --git a/spec/requests/api/v1/notifications/requests_spec.rb b/spec/requests/api/v1/notifications/requests_spec.rb index e1fe17426a..c385fb3499 100644 --- a/spec/requests/api/v1/notifications/requests_spec.rb +++ b/spec/requests/api/v1/notifications/requests_spec.rb @@ -120,4 +120,34 @@ RSpec.describe 'Requests' do expect(response).to have_http_status(200) end end + + describe 'GET /api/v1/notifications/requests/merged' do + subject do + get '/api/v1/notifications/requests/merged', headers: headers + end + + it_behaves_like 'forbidden for wrong scope', 'write write:notifications' + + context 'when the user has no accepted request pending merge' do + it 'returns http success and returns merged: true' do + subject + + expect(response).to have_http_status(200) + expect(body_as_json).to eq({ merged: true }) + end + end + + context 'when the user has an accepted request pending merge' do + before do + redis.set("notification_unfilter_jobs:#{user.account_id}", 1) + end + + it 'returns http success and returns merged: false' do + subject + + expect(response).to have_http_status(200) + expect(body_as_json).to eq({ merged: false }) + end + end + end end diff --git a/spec/services/accept_notification_request_service_spec.rb b/spec/services/accept_notification_request_service_spec.rb index bf67a52225..3c12c285a4 100644 --- a/spec/services/accept_notification_request_service_spec.rb +++ b/spec/services/accept_notification_request_service_spec.rb @@ -8,10 +8,11 @@ RSpec.describe AcceptNotificationRequestService do let(:notification_request) { Fabricate(:notification_request) } describe '#call' do - it 'destroys the notification request, creates a permission, and queues a worker' do + it 'destroys the notification request, creates a permission, increases the jobs count and queues a worker' do expect { subject.call(notification_request) } .to change { NotificationRequest.exists?(notification_request.id) }.to(false) .and change { NotificationPermission.exists?(account_id: notification_request.account_id, from_account_id: notification_request.from_account_id) }.to(true) + .and change { redis.get("notification_unfilter_jobs:#{notification_request.account_id}").to_i }.by(1) expect(UnfilterNotificationsWorker).to have_enqueued_sidekiq_job(notification_request.account_id, notification_request.from_account_id) end diff --git a/spec/workers/unfilter_notifications_worker_spec.rb b/spec/workers/unfilter_notifications_worker_spec.rb index 3f43b298a5..629eb644e4 100644 --- a/spec/workers/unfilter_notifications_worker_spec.rb +++ b/spec/workers/unfilter_notifications_worker_spec.rb @@ -13,13 +13,56 @@ describe UnfilterNotificationsWorker do Fabricate(:notification, filtered: true, from_account: sender, account: recipient, type: :mention, activity: mention) follow_request = sender.request_follow!(recipient) Fabricate(:notification, filtered: true, from_account: sender, account: recipient, type: :follow_request, activity: follow_request) + allow(redis).to receive(:publish) + allow(redis).to receive(:exists?).and_return(false) end shared_examples 'shared behavior' do - it 'unfilters notifications and adds private messages to conversations' do - expect { subject } - .to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false]) - .and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true) + context 'when this is the last pending merge job and the user is subscribed to streaming' do + before do + redis.set("notification_unfilter_jobs:#{recipient.id}", 1) + allow(redis).to receive(:exists?).with("subscribed:timeline:#{recipient.id}").and_return(true) + end + + it 'unfilters notifications, adds private messages to conversations, and pushes to redis' do + expect { subject } + .to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false]) + .and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true) + .and change { redis.get("notification_unfilter_jobs:#{recipient.id}").to_i }.by(-1) + + expect(redis).to have_received(:publish).with("timeline:#{recipient.id}:notifications", '{"event":"notifications_merged","payload":"1"}') + end + end + + context 'when this is not last pending merge job and the user is subscribed to streaming' do + before do + redis.set("notification_unfilter_jobs:#{recipient.id}", 2) + allow(redis).to receive(:exists?).with("subscribed:timeline:#{recipient.id}").and_return(true) + end + + it 'unfilters notifications, adds private messages to conversations, and does not push to redis' do + expect { subject } + .to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false]) + .and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true) + .and change { redis.get("notification_unfilter_jobs:#{recipient.id}").to_i }.by(-1) + + expect(redis).to_not have_received(:publish).with("timeline:#{recipient.id}:notifications", '{"event":"notifications_merged","payload":"1"}') + end + end + + context 'when this is the last pending merge job and the user is not subscribed to streaming' do + before do + redis.set("notification_unfilter_jobs:#{recipient.id}", 1) + end + + it 'unfilters notifications, adds private messages to conversations, and does not push to redis' do + expect { subject } + .to change { recipient.notifications.where(from_account_id: sender.id).pluck(:filtered) }.from([true, true]).to([false, false]) + .and change { recipient.conversations.exists?(last_status_id: sender.statuses.first.id) }.to(true) + .and change { redis.get("notification_unfilter_jobs:#{recipient.id}").to_i }.by(-1) + + expect(redis).to_not have_received(:publish).with("timeline:#{recipient.id}:notifications", '{"event":"notifications_merged","payload":"1"}') + end end end