Home Reference Source

src/users-new/UserRepository.ts

import produce, {castDraft, Immutable} from 'immer';
import {merge, Observable, of, OperatorFunction, Subject, throwError} from 'rxjs';
import {filter, map, scan, shareReplay, startWith, switchMap, tap} from 'rxjs/operators';
import Logs from '../Logs';
import {Resolver} from '../wac-proxy/wac-stack/resolver/Resolver';
import {ResolvedUsersDto, UserDto} from '../wac-proxy/wac-stack/resolver/types';
import {User} from './User';
import {PresenceService} from '../wac-proxy/wac-stack/presence/PresenceService';
import {PresenceDto} from '../wac-proxy/wac-stack/presence/types';
import {UserRole} from './UserRole';
import {bufferDebounce} from '../utils/rxjs-operators';

const log = Logs.instance.getLogger('SippoJS/UserRepository');

type Action = Immutable<{
	type: 'USERS';
	resolved: ResolvedUsersDto['resolved'];
	subscribed: ResolvedUsersDto['subscribed'];
} | {
	type: 'USERS_EVENT';
	resolved: ResolvedUsersDto['resolved'];
	subscribed: ResolvedUsersDto['subscribed'];
} | {
	type: 'PRESENCE';
	presences: readonly PresenceDto[];
} | {
	type: 'PRESENCE_EVENT';
	presences: readonly PresenceDto[];
}>;

type State = Immutable<{
	users: {
		[key: string]: UserDto;
	};
	presences: {
		[key: string]: PresenceDto;
	};
}>;

const toUserRole = (role: UserDto['role']): UserRole => {
	switch (role) {
		case 'anonymous': return UserRole.ANONYMOUS;
		case 'user': return UserRole.USER;
	}
};

const toUser = (userDto: UserDto, presence: PresenceDto): User => ({
	id: userDto.id,
	domain: userDto.domain,
	username: userDto.username,
	email: userDto.email,
	mobilePhones: userDto.mobilePhone,
	alias: userDto.alias,
	gatewayUsername: userDto.gatewayUsername,
	role: toUserRole(userDto.role),
	online: presence.online,
	activity: presence.activity,
	mood: presence.mood,
	note: presence.note,
	avatar: presence.avatar,
	displayName: presence.displayName,
});

function mergeUsersAndPresences({users, presences}: State): readonly User[] {
	return Object.keys(users).map((id) => {
		if (presences[`wac-user:${id}`]) {
			return toUser(users[id], presences[`wac-user:${id}`]);
		}
	}).filter((user): user is User => user !== undefined);
}

const reducer = produce<State, [Action]>((draft, action) => {
	switch (action.type) {
		case 'USERS':
		case 'USERS_EVENT':
			action.resolved.forEach((user) => {
				draft.users[user.id] = castDraft(user);
			});
			return;
		case 'PRESENCE':
		case 'PRESENCE_EVENT':
			action.presences.forEach((presence) => {
				draft.presences[presence.address] = presence;
			});
			return;
	}
});

/**
 * Allows retrieving users. An instance of this class must be obtained using
 * {@link Session.getUserRepository}
 */
export class UserRepository {
	private users$: Observable<readonly User[]>;
	private requestSubscriptionSubject = new Subject<string>();
	private currentSubscriptions = new Set<string>();

	constructor(
		private resolver: Resolver,
		private presenceService: PresenceService,
	) {
		this.users$ = this.getActions().pipe(
			startWith({users: {}, presences: {}}) as OperatorFunction<Action, Action>,
			tap(x => log.debug('action', x)),
			scan(reducer),
			tap(x => log.debug('state.users', x.users)),
			tap(x => log.debug('state.presences', x.presences)),
			map(state => mergeUsersAndPresences(state)),
			shareReplay(1),
		);
		this.users$.subscribe();
	}

	/**
	 * Allows obtaining a representation of any user of the system
	 * @param userId The id of the requested user
	 * @returns An observable emitting a representation of the user with the provided ID as soon
	 * as it is available and, subsequently, all the updates that occur in any of its properties.
	 */
	getUser$(userId: string): Observable<User> {
		return this.users$.pipe(
			tap(() => {
				if (!this.currentSubscriptions.has(userId)) {
					this.requestSubscriptionSubject.next(userId);
					this.currentSubscriptions.add(userId);
				}
			}),
			map(users => users.find(user => user.id === userId)),
			filter((user): user is User => user !== undefined),
		);
	}

	/**
	 * Allows obtaining a representation of any user of the system
	 * @param address The address of the requested user
	 * @returns An observable emitting a representation of the user with the provided ID as soon
	 * as it is available and, subsequently, all the updates that occur in any of its properties.
	 */
	getUserByAddress$(address: string): Observable<User> {
		return this.users$.pipe(
			switchMap((users) => {
				const user = users.find(x => `${x.username}@${x.domain}` === address);
				if (!user) {
					return this.resolver.getUserByGatewayUsername$(address).pipe(
						switchMap(id => !id ? throwError(`Cannot resolve user address: ${address}`) : this.getUser$(id)),
					);
				}
				return of(user);
			}),
		);
	}

	private getActions(): Observable<Action> {
		return merge(
			this.resolver.event$.pipe(
				map((resolvedUsers): Action => ({type: 'USERS_EVENT', ...resolvedUsers.body})),
			),
			this.presenceService.event$.pipe(
				map((event): Action => ({type: 'PRESENCE_EVENT', presences: [event.body]})),
			),
			this.requestSubscriptionSubject.pipe(
				bufferDebounce(100),
				switchMap(userIds => this.resolver.resolveUsers$(userIds)),
				map((resolvedUsers): Action => ({type: 'USERS', ...resolvedUsers})),
			),
			this.requestSubscriptionSubject.pipe(
				bufferDebounce(100),
				switchMap(userIds => this.presenceService.subscribeToPresences$(userIds.map(id => `wac-user:${id}`))),
				map((presences): Action => ({type: 'PRESENCE', presences})),
			),
		);
	}
}