src/users-new/UserRepository.ts
import produce, {castDraft, Immutable} from 'immer';
import {merge, Observable, of, OperatorFunction, Subject, throwError} from 'rxjs';
import {filter, map, scan, shareReplay, startWith, switchMap, tap} from 'rxjs/operators';
import Logs from '../Logs';
import {Resolver} from '../wac-proxy/wac-stack/resolver/Resolver';
import {ResolvedUsersDto, UserDto} from '../wac-proxy/wac-stack/resolver/types';
import {User} from './User';
import {PresenceService} from '../wac-proxy/wac-stack/presence/PresenceService';
import {PresenceDto} from '../wac-proxy/wac-stack/presence/types';
import {UserRole} from './UserRole';
import {bufferDebounce} from '../utils/rxjs-operators';
const log = Logs.instance.getLogger('SippoJS/UserRepository');
type Action = Immutable<{
type: 'USERS';
resolved: ResolvedUsersDto['resolved'];
subscribed: ResolvedUsersDto['subscribed'];
} | {
type: 'USERS_EVENT';
resolved: ResolvedUsersDto['resolved'];
subscribed: ResolvedUsersDto['subscribed'];
} | {
type: 'PRESENCE';
presences: readonly PresenceDto[];
} | {
type: 'PRESENCE_EVENT';
presences: readonly PresenceDto[];
}>;
type State = Immutable<{
users: {
[key: string]: UserDto;
};
presences: {
[key: string]: PresenceDto;
};
}>;
const toUserRole = (role: UserDto['role']): UserRole => {
switch (role) {
case 'anonymous': return UserRole.ANONYMOUS;
case 'user': return UserRole.USER;
}
};
const toUser = (userDto: UserDto, presence: PresenceDto): User => ({
id: userDto.id,
domain: userDto.domain,
username: userDto.username,
email: userDto.email,
mobilePhones: userDto.mobilePhone,
alias: userDto.alias,
gatewayUsername: userDto.gatewayUsername,
role: toUserRole(userDto.role),
online: presence.online,
activity: presence.activity,
mood: presence.mood,
note: presence.note,
avatar: presence.avatar,
displayName: presence.displayName,
});
function mergeUsersAndPresences({users, presences}: State): readonly User[] {
return Object.keys(users).map((id) => {
if (presences[`wac-user:${id}`]) {
return toUser(users[id], presences[`wac-user:${id}`]);
}
}).filter((user): user is User => user !== undefined);
}
const reducer = produce<State, [Action]>((draft, action) => {
switch (action.type) {
case 'USERS':
case 'USERS_EVENT':
action.resolved.forEach((user) => {
draft.users[user.id] = castDraft(user);
});
return;
case 'PRESENCE':
case 'PRESENCE_EVENT':
action.presences.forEach((presence) => {
draft.presences[presence.address] = presence;
});
return;
}
});
/**
* Allows retrieving users. An instance of this class must be obtained using
* {@link Session.getUserRepository}
*/
export class UserRepository {
private users$: Observable<readonly User[]>;
private requestSubscriptionSubject = new Subject<string>();
private currentSubscriptions = new Set<string>();
constructor(
private resolver: Resolver,
private presenceService: PresenceService,
) {
this.users$ = this.getActions().pipe(
startWith({users: {}, presences: {}}) as OperatorFunction<Action, Action>,
tap(x => log.debug('action', x)),
scan(reducer),
tap(x => log.debug('state.users', x.users)),
tap(x => log.debug('state.presences', x.presences)),
map(state => mergeUsersAndPresences(state)),
shareReplay(1),
);
this.users$.subscribe();
}
/**
* Allows obtaining a representation of any user of the system
* @param userId The id of the requested user
* @returns An observable emitting a representation of the user with the provided ID as soon
* as it is available and, subsequently, all the updates that occur in any of its properties.
*/
getUser$(userId: string): Observable<User> {
return this.users$.pipe(
tap(() => {
if (!this.currentSubscriptions.has(userId)) {
this.requestSubscriptionSubject.next(userId);
this.currentSubscriptions.add(userId);
}
}),
map(users => users.find(user => user.id === userId)),
filter((user): user is User => user !== undefined),
);
}
/**
* Allows obtaining a representation of any user of the system
* @param address The address of the requested user
* @returns An observable emitting a representation of the user with the provided ID as soon
* as it is available and, subsequently, all the updates that occur in any of its properties.
*/
getUserByAddress$(address: string): Observable<User> {
return this.users$.pipe(
switchMap((users) => {
const user = users.find(x => `${x.username}@${x.domain}` === address);
if (!user) {
return this.resolver.getUserByGatewayUsername$(address).pipe(
switchMap(id => !id ? throwError(`Cannot resolve user address: ${address}`) : this.getUser$(id)),
);
}
return of(user);
}),
);
}
private getActions(): Observable<Action> {
return merge(
this.resolver.event$.pipe(
map((resolvedUsers): Action => ({type: 'USERS_EVENT', ...resolvedUsers.body})),
),
this.presenceService.event$.pipe(
map((event): Action => ({type: 'PRESENCE_EVENT', presences: [event.body]})),
),
this.requestSubscriptionSubject.pipe(
bufferDebounce(100),
switchMap(userIds => this.resolver.resolveUsers$(userIds)),
map((resolvedUsers): Action => ({type: 'USERS', ...resolvedUsers})),
),
this.requestSubscriptionSubject.pipe(
bufferDebounce(100),
switchMap(userIds => this.presenceService.subscribeToPresences$(userIds.map(id => `wac-user:${id}`))),
map((presences): Action => ({type: 'PRESENCE', presences})),
),
);
}
}