diff --git a/src/post/distribute.ts b/src/post/distribute.ts index 49c6eb22d..ad699d6b8 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -1,8 +1,11 @@ +import Channel from '../models/channel'; +import ChannelWatching from '../models/channel-watching'; +import Following from '../models/following'; import Mute from '../models/mute'; import Post, { pack } from '../models/post'; import Watching from '../models/post-watching'; -import User from '../models/user'; -import stream from '../publishers/stream'; +import User, { isLocalUser } from '../models/user'; +import stream, { publishChannelStream } from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; import queue from '../queue'; @@ -21,10 +24,6 @@ export default async (user, mentions, post) => { latestPost: post._id } }), - new Promise((resolve, reject) => queue.create('http', { - type: 'deliverPost', - id: post._id, - }).save(error => error ? reject(error) : resolve())), ] as Array>; function addMention(promisedMentionee, reason) { @@ -50,6 +49,91 @@ export default async (user, mentions, post) => { })); } + // タイムラインへの投稿 + if (!post.channelId) { + promises.push( + // Publish event to myself's stream + promisedPostObj.then(postObj => { + stream(post.userId, 'post', postObj); + }), + + Promise.all([ + User.findOne({ _id: post.userId }), + + // Fetch all followers + Following.aggregate([{ + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, { + $match: { + followeeId: post.userId + } + }], { + _id: false + }) + ]).then(([user, followers]) => Promise.all(followers.map(following => { + if (isLocalUser(following.follower)) { + // Publish event to followers stream + return promisedPostObj.then(postObj => { + stream(following.followerId, 'post', postObj); + }); + } + + return new Promise((resolve, reject) => { + queue.create('http', { + type: 'deliverPost', + fromId: user._id, + toId: following.followerId, + postId: post._id + }).save(error => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }))) + ); + } + + // チャンネルへの投稿 + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + } + // If has in reply to post if (post.replyId) { promises.push( diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts index c00ab912c..48ad4f95a 100644 --- a/src/processor/http/deliver-post.ts +++ b/src/processor/http/deliver-post.ts @@ -1,93 +1,21 @@ -import Channel from '../../models/channel'; -import Following from '../../models/following'; -import ChannelWatching from '../../models/channel-watching'; -import Post, { pack } from '../../models/post'; -import User, { isLocalUser } from '../../models/user'; -import stream, { publishChannelStream } from '../../publishers/stream'; +import Post from '../../models/post'; +import User, { IRemoteUser } from '../../models/user'; import context from '../../remote/activitypub/renderer/context'; import renderCreate from '../../remote/activitypub/renderer/create'; import renderNote from '../../remote/activitypub/renderer/note'; import request from '../../remote/request'; -export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { - const promisedPostObj = pack(post); - const promises = []; +export default async ({ data }) => { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); - // タイムラインへの投稿 - if (!post.channelId) { - promises.push( - // Publish event to myself's stream - promisedPostObj.then(postObj => { - stream(post.userId, 'post', postObj); - }), + create['@context'] = context; - Promise.all([ - User.findOne({ _id: post.userId }), - - // Fetch all followers - Following.aggregate([{ - $lookup: { - from: 'users', - localField: 'followerId', - foreignField: '_id', - as: 'follower' - } - }, { - $match: { - followeeId: post.userId - } - }], { - _id: false - }) - ]).then(([user, followers]) => Promise.all(followers.map(following => { - if (isLocalUser(following.follower)) { - // Publish event to followers stream - return promisedPostObj.then(postObj => { - stream(following.followerId, 'post', postObj); - }); - } - - return renderNote(user, post).then(note => { - const create = renderCreate(note); - create['@context'] = context; - return request(user, following.follower[0].account.inbox, create); - }); - }))) - ); - } - - // チャンネルへの投稿 - if (post.channelId) { - promises.push( - // Increment channel index(posts count) - Channel.update({ _id: post.channelId }, { - $inc: { - index: 1 - } - }), - - // Publish event to channel - promisedPostObj.then(postObj => { - publishChannelStream(post.channelId, 'post', postObj); - }), - - Promise.all([ - promisedPostObj, - - // Get channel watchers - ChannelWatching.find({ - channelId: post.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }) - ]).then(([postObj, watches]) => { - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'post', postObj); - }); - }) - ); - } - - return Promise.all(promises); -}); + return request(from, to.account.inbox, create); +};