Reactive Data Sync: Mastering Concurrent Updates in Large-Scale Applications

16 minute read Modified:

Resolving critical data consistency issues in a complex med-tech ecosystem through architectural analysis and implementation of reactive synchronization strategies.
Table of Contents

Introduction

In the domain of complex software systems, particularly in the med-tech sector where data consistency and timely delivery are paramount, managing data synchronization across multiple layers of an application ecosystem presents unique challenges. These challenges are amplified when dealing with large-scale monorepos housing multiple interconnected applications and microservices. In this post, I’ll share my recent experience tackling a complex software architecture task that involved solving critical data consistency issues in a system handling sensitive data with high-frequency updates and potential conflicts.

The Problem: When Two Truths Diverge

The ecosystem I was working with consisted of five applications sharing a significant amount of code, supported by a dozen microservices. At the core of our architecture were two critical components:

1. A synchronous store for optimistic updates

2. A persistent database layer in the client applications

This setup aimed to provide a responsive user experience while ensuring data persistence and integrity. However, as our ecosystem grew and user interactions became more complex, I encountered a perfect storm of issues:

1. Misalignment Between Store and Database

The high frequency of writes and conflicts meant that our synchronous store and persistent database were often out of sync. This led to a situation where we no longer had a single source of truth, potentially causing data inconsistencies and loss of critical information.

2. Database Overwhelm

Under heavy load, particularly during periods of numerous conflicts, our database would sometimes “give up,” ceasing to persist data altogether. This created a dangerous scenario where users believed their inputs were being saved (due to the store updating), while in reality, no data persistence was occurring.

3. Refactoring Constraints

The scale of our codebase, coupled with strict requirements, made it impractical to refactor all the different parts of our applications where high writes and conflicts were occurring. A solution was needed that could be implemented at the core layer without disrupting existing business logic or requiring extensive changes across our applications.

The Solution: Reactive Data Sync

To address these challenges, I implemented a comprehensive solution at the base layer, specifically within our EntityStoreService. This architectural approach tackled the issues at their root without necessitating widespread changes across our applications. Let’s examine the key components of this solution.

1. Queuing Mechanism

The solution introduced a smart queuing system to manage updates more efficiently:

private updateQueue: Map<string, EntityQueuedOperation<T>> = new Map();
private updateSubject = new Subject<string>();

private queueOperation(
  id: string,
  operation: 'update' | 'remove',
  document: Partial<T>,
): Promise<PouchDB.Core.Response> {
  return new Promise((resolve, reject) => {
    const existingOperation = this.updateQueue.get(id);
    const initialQueueTimestamp = existingOperation?.initialQueueTimestamp || Date.now();

    this.updateQueue.set(id, {
      id,
      operation,
      document,
      isProcessing: false,
      resolve,
      reject,
      initialQueueTimestamp,
    });

    this.updateSubject.next(id);
  });
}

This queue allows for batching updates and handling them more efficiently, reducing the likelihood of conflicts and database overwhelm. The architectural beauty of this approach lies in its ability to manage high-frequency updates at the core level, providing a buffer that smooths out the data flow across the entire application ecosystem.

2. Debouncing, Smart Processing, and Queue Flushing

To further optimize the update process, a sophisticated mechanism combining debouncing, smart queue processing, and a flush queue feature was implemented:

private _setupSmartQueueProcessing(): void {
  this.updateSubject.pipe(debounceTime(DEBOUNCE_TIME_MS)).subscribe((id) => this._processQueuedOperation(id));

  timer(0, DEBOUNCE_TIME_MS)
    .pipe(filter(() => this.updateQueue.size > 0))
    .subscribe(() => {
      const now = Date.now();
      const entries = Array.from(this.updateQueue.entries());

      for (const [id, operation] of entries) {
        if (now - operation.initialQueueTimestamp > MAX_QUEUE_TIME_MS) {
          this._processQueuedOperation(id);
        }
      }
    });
}

This multi-faceted approach ensures that the database isn’t overwhelmed with rapid-fire updates while still maintaining responsiveness. The debouncing mechanism coalesces multiple updates to the same entity, reducing unnecessary processing.

The flush queue mechanism, represented by the timer and subsequent processing, acts as a safeguard against operations lingering in the queue for too long. If an operation exceeds MAX_QUEUE_TIME_MS in the queue, it’s forcefully processed, ensuring that no update is indefinitely delayed.

This architectural decision strikes a delicate balance between efficiency and timely data persistence, crucial in an environment where both system performance and data immediacy are paramount.

3. Robust Error Handling and Retries with Reconnection Strategy

To address the issue of database failures, a comprehensive error handling and retry mechanism with an additional layer of resilience was implemented:

async executeWithRetry<T extends PouchContentBase>(
  operation: () => Promise<PouchDB.Core.Response>,
  newDoc: T,
): Promise<PouchDB.Core.Response> {
  const pouchReconnectTimeout = setTimeout(() => this.pouch.tryReconnection(), RETRY_TIMEOUT_MS * MAX_RETRIES);

  for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
    try {
      const result = await operation();

      clearTimeout(pouchReconnectTimeout);
      return result;
    } catch (error: any) {
     if (error.status === NOT_FOUND_ERROR_CODE) {
          clearTimeout(pouchReconnectTimeout);
          throw error;
        }
        if (error.status === CONFLICT_ERROR_CODE && attempt >= CONFLICT_RESOLUTION_THRESHOLD) {
          clearTimeout(pouchReconnectTimeout);

          return await this.resolveConflict(newDoc as PouchDocument<T>);
        }
        if (attempt === MAX_RETRIES - 1) {
          clearTimeout(pouchReconnectTimeout);
          await this.pouch.tryReconnection();
          throw new Error('Operation failed after all retry attempts');
        }
        await new Promise((resolve) => setTimeout(resolve, RETRY_TIMEOUT_MS * attempt));
    }
  }

  clearTimeout(pouchReconnectTimeout);

  throw new Error('Operation failed after all retry attempts');
}

This method ensures that temporary database issues don’t result in data loss, giving the system multiple opportunities to successfully persist data. The pouchReconnectTimeout serves as a final safety net. If operations consistently fail to complete within the allocated time, it triggers a forced reconnection to the database, essentially rebuilding the entire connection.

In designing this solution, I made a conscious effort to minimize the use of setTimeout and Promises. As explained in my previous post on the event loop, these Web APIs can quickly bloat memory, especially with intense write operations. By using them judiciously, I’ve created a more efficient system that can handle high-frequency updates without unnecessary memory overhead.

It’s worth noting that in extensive testing, including simulated high-load scenarios, this reconnection mechanism was not triggered even once. This speaks to the robustness of the primary error handling and retry logic. However, its presence provides an additional layer of resilience, crucial in an environment where data integrity is paramount.

The architectural foresight in implementing this “last gate” mechanism exemplifies a commitment to building a system that can gracefully handle even the most extreme edge cases, ensuring continuous operation and data consistency in critical scenarios.

4. Intelligent Conflict Resolution

To tackle the complex issue of conflicts, a conflict resolution strategy was implemented:

private async resolveConflict<T extends PouchContentBase>(newDoc: PouchDocument<T>): Promise<PouchDB.Core.Response> {
  console.error('Conflict detected, attempting to resolve');
  const existingDoc = (await this.pouch.getDoc(newDoc._id)) as PouchDocument<T>;
  const mergedDoc = this.smartMerge(newDoc, existingDoc);

  return await this.pouch.putDoc(mergedDoc);
}

private smartMerge<T extends PouchContentBase>(
  newDoc: Partial<PouchDocument<T>>,
  existingDoc: PouchDocument<T>,
): PouchDocument<T> {
  const mergedDoc = { ...existingDoc };

  Object.keys(newDoc).forEach((key) => {
    if (key === '_id' || key === '_rev') {
      return;
    }

    const newValue = newDoc[key as keyof PouchDocument<T>];
    const existingValue = existingDoc[key as keyof PouchDocument<T>];

    if (this.isScalar(newValue) || this.isScalar(existingValue)) {
      mergedDoc[key as keyof PouchDocument<T>] = newValue as any;
    } else if (Array.isArray(newValue) && Array.isArray(existingValue)) {
      mergedDoc[key as keyof PouchDocument<T>] = this.mergeArrays(newValue, existingValue) as any;
    } else if (typeof newValue === 'object' && typeof existingValue === 'object') {
      mergedDoc[key as keyof PouchDocument<T>] = this.smartMerge(
        newValue as Partial<PouchDocument<T>>,
        existingValue as any as PouchDocument<T>,
      ) as any;
    }
  });

  return mergedDoc;
}

This approach allows for intelligently merging conflicting documents, preserving important changes from both versions and reducing data loss. In a context where every piece of information can be crucial, this conflict resolution mechanism ensures that no critical updates are inadvertently overwritten.

5. Optimistic Updates with Rollback

To maintain a responsive user experience while ensuring data integrity, an optimistic update mechanism with rollback capabilities was implemented:

public updateDoc(id: string, updates: Partial<T>): Promise<PouchDB.Core.Response> {
  console.debug(`[EntityStoreServiceBase.${this.type}] updateDoc`, id, updates);
  this.store.upsert(id, updates);

  return this.queueOperation(id, 'update', updates);
}

private async _handleFailedUpdate(operation: EntityQueuedOperation<T>, error: any): Promise<void> {
  console.error(`Handling failed ${operation.operation} for document ${operation.id}`);

  try {
    const latestDoc = await this.pouch.getDoc(operation.id);

    if (operation.operation === 'update') {
      this.store.upsert(operation.id, latestDoc as T);
      console.debug(`Successfully reverted and updated document ${operation.id} in store`);
    } else if (operation.operation === 'remove') {
      this.store.upsert(operation.id, latestDoc as T);
      console.debug(`Re-added document ${operation.id} to store due to failed removal`);
    }
  } catch (fetchError) {
    console.error(`Failed to fetch latest version of document ${operation.id}:`, fetchError);

    this.store.remove(operation.id);
    console.debug(`Removed document ${operation.id} from store due to failed operation and fetch`);
  }

  console.error(`Operation failed for document ${operation.id}:`, error);

  this.updateQueue.delete(operation.id);
  operation.reject(error);
}

This system allows for immediately updating the synchronous store for a snappy user experience, crucial in fast-paced environments, while queuing the actual database update. In case of failure, the store can be rolled back to maintain consistency with the persistent layer, ensuring that users always see accurate, up-to-date information.

The Impact: A More Robust and Responsive Ecosystem

By implementing these solutions at the core layer of the application stack, several significant improvements were achieved:

1. Enhanced Data Consistency: The intelligent queuing and conflict resolution mechanisms dramatically reduced instances of data misalignment between the store and database, crucial for maintaining accurate records.

2. Improved Resilience: The retry logic and smart processing helped the system gracefully handle periods of high load and temporary database issues, ensuring continuous availability of critical data.

3. Maintained Responsiveness: The optimistic update system, combined with the queuing mechanism, allowed for a snappy user interface even during complex data operations, supporting efficient workflows for users.

4. Reduced Data Loss: The conflict resolution and rollback capabilities significantly decreased instances of unintended data loss during concurrent updates, safeguarding vital information.

5. Scalability: By implementing these solutions at the core layer, the performance and reliability of all applications were improved without requiring extensive refactoring of individual components, a crucial consideration in our regulated environment.

Conclusion

Addressing data synchronization issues in a large-scale, multi-application environment is a complex challenge that requires a delicate balance of performance, reliability, and data integrity. By carefully analyzing the root causes of the problems and implementing a comprehensive solution at the core layer, I was able to dramatically improve the consistency and performance of the entire ecosystem.

The key architectural insights from this experience are:

1. Address systemic issues at the lowest common denominator when possible, allowing for widespread improvements without disruptive changes.

2. Implement intelligent queuing and processing to manage high-frequency updates, crucial in data-intensive applications.

3. Always plan for failure with robust retry and rollback mechanisms, ensuring data integrity even in edge cases.

4. Use optimistic updates judiciously to balance responsiveness with data accuracy, supporting efficient user workflows.

5. Invest in smart conflict resolution to preserve data in complex scenarios, critical when multiple users may update the same record.

While I’m proud of the robustness and elegance of this solution, I recognize that in the ever-evolving landscape of software development, continuous improvement and adaptation are necessary. I remain committed to refining this approach as new challenges emerge, always with the goal of supporting better user experiences through reliable, efficient technology.

By sharing this experience, I hope to contribute to the broader conversation on building resilient, scalable systems in critical domains. Remember, with the right architectural approach, even the most daunting technical challenges can be overcome, paving the way for innovations that can truly make a difference.

Bonus Treat: A Fresh Take on PouchDB with Reactive Goodness

Here’s a little dessert for the curious developer: a sneak peek at how you might whip up a reactive PouchDB store from scratch. This approach blends the robustness of PouchDB with the zingy flavors of RxJS, creating a deliciously responsive data management solution. Consider it a recipe for your next project’s secret sauce:

First, let’s look at a service that implements a reactive PouchDB store:

import { Injectable } from '@angular/core';
import PouchDB from 'pouchdb';
import PouchFind from 'pouchdb-find';
import { BehaviorSubject, Observable, defer, from, of } from 'rxjs';
import { map, tap, switchMap, filter, catchError, shareReplay, take } from 'rxjs/operators';
import {
  DocTypes,
  Document,
  AnyDocument,
  createDocument,
  addAttachment,
  Attachment,
  DocAttachmentKeys,
  NON_REPLICABLE_TYPES,
  getDocTypeFromId,
} from '../models';
import { environment } from 'src/environments/environment';

PouchDB.plugin(PouchFind);

const preloadDocsTypes: DocTypes[] = ['session', 'user'];

@Injectable({
  providedIn: 'root',
})
export class PouchDBService {
  private db!: PouchDB.Database;
  private stores = new Map<DocTypes, BehaviorSubject<Map<string, AnyDocument>>>();
  private dbReady$ = new BehaviorSubject<boolean>(false);
  private replications: { [key: string]: PouchDB.Replication.Sync<{}> } = {};

  constructor() {
    this.initializeDatabase().catch((error) => console.error('Database initialization failed:', error));
  }

  private get dbIsReady(): Observable<boolean> {
    return this.dbReady$.pipe(
      filter((ready) => ready),
      take(1),
      shareReplay(1),
    );
  }

  public getAll<T extends DocTypes>(type: T): Observable<Document<T>[]> {
    return this.dbIsReady.pipe(
      switchMap(() => this.getStore(type)),
      map((store) => Array.from(store.values())),
      catchError(() => of([])),
      shareReplay(1),
    );
  }

  public get<T extends DocTypes>(id: string): Observable<Document<T> | undefined> {
    return this.dbIsReady.pipe(
      switchMap(() => {
        const type = getDocTypeFromId(id);
        if (!type) {
          console.error(`Unable to determine document type for id: ${id}`);
          return of(undefined);
        }

        return this.getStore(type as T).pipe(
          switchMap((store) => {
            const doc = store.get(id) as Document<T> | undefined;
            return doc ? of(doc) : this.retrieveDocumentFromDatabase<T>(id);
          }),
        );
      }),
      catchError((error) => {
        console.error(`Error retrieving document with id ${id}:`, error);
        return of(undefined);
      }),
      shareReplay(1),
    );
  }

  public create<T extends DocTypes>(
    type: T,
    data: Partial<Document<T>>,
    attachment?: Attachment<T>,
  ): Observable<Document<T>> {
    let doc = createDocument(type, data) as Document<T>;

    if (attachment) {
      doc = addAttachment(doc, attachment.key, attachment.data, attachment.contentType);
    }

    return this.dbIsReady.pipe(
      switchMap(() => from(this.db.post(doc as PouchDB.Core.PostDocument<Document<T>>))),
      map((response) => ({ ...doc, _id: response.id, _rev: response.rev })),
      catchError((error) => {
        console.error(`Error creating document of type ${type}:`, error);
        throw error;
      }),
      shareReplay(1),
    );
  }

  public update<T extends DocTypes>(doc: Document<T>, attachment?: Attachment<T>): Observable<Document<T>> {
    let updatedDoc = { ...doc, updatedAt: Date.now() };

    if (attachment) {
      updatedDoc = addAttachment(updatedDoc, attachment.key, attachment.data, attachment.contentType);
    }

    return this.dbIsReady.pipe(
      switchMap(() => from(this.db.put(updatedDoc as PouchDB.Core.PutDocument<Document<T>>))),
      map((response) => ({ ...updatedDoc, _rev: response.rev })),
      catchError((error) => {
        console.error(`Error updating document: ${doc._id}`, error);
        throw error;
      }),
      shareReplay(1),
    );
  }

  public getAttachment<T extends DocTypes>(
    docId: string,
    attachmentId: DocAttachmentKeys<T>,
    options: { rev?: string } = {},
  ): Observable<Blob | Buffer> {
    return this.dbIsReady.pipe(
      switchMap(() =>
        defer(
          () =>
            new Promise<Blob>((resolve, reject) =>
              this.db.getAttachment(docId, attachmentId as string, options, (err, blob) =>
                err ? reject(err) : resolve(blob as Blob),
              ),
            ),
        ),
      ),
      catchError((error) => {
        console.error(`Error in getAttachment observable:`, error);
        return [];
      }),
      shareReplay(1),
    );
  }

  public deleteAttachment<T extends DocTypes>(
    docId: string,
    attachmentKey: DocAttachmentKeys<T>,
    rev: string,
  ): Observable<PouchDB.Core.Response> {
    return this.dbIsReady.pipe(
      switchMap(() => from(this.db.removeAttachment(docId, attachmentKey as string, rev))),
      catchError((error) => {
        console.error(`Error deleting attachment ${attachmentKey as string} for document ${docId}:`, error);
        throw error;
      }),
      shareReplay(1),
    );
  }

  public delete<T extends DocTypes>(doc: Document<T>): Observable<PouchDB.Core.Response> {
    return this.dbIsReady.pipe(
      switchMap(() => from(this.db.remove(doc))),
      tap(() => this.updateStore({ ...doc, _deleted: true } as AnyDocument)),
      catchError((error) => {
        console.error(`Error deleting document: ${doc._id}`, error);
        throw error;
      }),
      shareReplay(1),
    );
  }

  public clearAllData(): Observable<void> {
    return this.dbIsReady.pipe(
      switchMap(() => from(this.destroyDatabase())),
      switchMap(() => from(this.initializeDatabase())),
      tap(() => this.stores.forEach((store) => store.next(new Map()))),
      catchError((error) => {
        console.error('Error clearing all data:', error);
        throw error;
      }),
      shareReplay(1),
    );
  }

  public query<T extends DocTypes>(
    type: T,
    selector: PouchDB.Find.Selector,
    sort?: Array<string | { [propName: string]: 'asc' | 'desc' }>,
  ): Observable<Document<T>[]> {
    return this.dbIsReady.pipe(
      switchMap(() => from(this.db.find({ selector: { ...selector, type }, sort }))),
      map((result) => result.docs as Document<T>[]),
      tap((docs) => docs.forEach((doc) => this.updateStore(doc))),
      catchError((error) => {
        console.error(`Error querying documents of type ${type}:`, error);
        return of([]);
      }),
      shareReplay(1),
    );
  }

  public startReplication(userDBs: { [key: string]: string }): void {
    this.stopReplication();

    for (const [userKey, userDBUrl] of Object.entries(userDBs)) {
      const parsedUrl = new URL(userDBUrl);
      const { username, password } = parsedUrl;
      const remoteUrl = [
        environment.couchDBProtocol,
        username,
        ':',
        password,
        '@',
        environment.couchDBDomain,
        parsedUrl.pathname,
      ].join('');
      // Docs starting with '_' or in NON_REPLICABLE_TYPES are not replicated
      const filterFunction = (doc: any) => !doc._id.startsWith('_') && !NON_REPLICABLE_TYPES.includes(doc.type);

      this.replications[userKey] = this.db.sync(remoteUrl, {
        live: true,
        retry: true,
        filter: filterFunction,
      });
    }
  }

  public stopReplication(): void {
    Object.values(this.replications)
      .filter((replication) => replication)
      .forEach((replication) => replication.cancel());
    this.replications = {};
  }

  private async initializeDatabase() {
    if (this.db) return;
    this.db = new PouchDB('local-db'); // or any other name
    await Promise.all([
      this.db.createIndex({ index: { fields: ['type'] } }),
      this.db.createIndex({ index: { fields: ['createdAt'] } }),
    ]);
    this.setupChanges();

    await Promise.all(preloadDocsTypes.map((type) => this.loadAllDocuments(type)));

    console.log('Database initialized successfully');
    this.dbReady$.next(true);
  }

  private async destroyDatabase(): Promise<void> {
    if (this.db) {
      await this.db.destroy();
      this.db = null!;
      this.dbReady$.next(false);
    }
  }

  private setupChanges() {
    this.db.changes({ since: 'now', live: true, include_docs: true }).on('change', (change) => {
      if (change.doc) {
        this.updateStore(change.doc as AnyDocument);
      }
    });
  }

  private getStore<T extends DocTypes>(type: T): BehaviorSubject<Map<string, Document<T>>> {
    if (!this.stores.has(type)) {
      this.stores.set(type, new BehaviorSubject(new Map()));
      this.loadAllDocuments(type);
    }

    return this.stores.get(type) as unknown as BehaviorSubject<Map<string, Document<T>>>;
  }

  private updateStore(doc: AnyDocument) {
    const store = this.getStore(doc.type);
    const currentMap = new Map(store.value);
    if (doc._deleted) {
      currentMap.delete(doc._id);
    } else {
      currentMap.set(doc._id, doc);
    }
    store.next(currentMap);
  }

  private retrieveDocumentFromDatabase<T extends DocTypes>(id: string): Observable<Document<T> | undefined> {
    return from(this.db.get<Document<T>>(id)).pipe(
      filter((doc) => !!doc),
      tap((doc) => this.updateStore(doc as AnyDocument)),
      catchError(() => of(undefined)),
    );
  }

  private async loadAllDocuments<T extends DocTypes>(type: T) {
    try {
      const result = await this.db.find({ selector: { type } });
      const store = this.getStore(type);
      const newMap = new Map(result.docs.map((doc) => [doc._id, doc as Document<T>]));
      store.next(newMap);
    } catch (error) {
      console.error(`Error loading documents of type ${type}:`, error);
    }
  }
}

And here’s the corresponding model file for modularity:

import { customAlphabet } from 'nanoid';

// Define non-replicable document types
export const NON_REPLICABLE_TYPES: DocTypes[] = ['session'];

// Define and export attachment keys for each document type
export const AttachmentKeys = {
  user: {
    userImage: 'walkImage' as const,
  },
  session: {},
} as const;

// Type helper for attachment keys
export type AttachmentKeysType = typeof AttachmentKeys;
export type DocAttachmentKeys<T extends DocTypes> = keyof AttachmentKeysType[T];
export type Attachment<T extends DocTypes> = {
  key: DocAttachmentKeys<T>;
  data: Blob | Buffer | string;
  contentType: string;
};

// Define a type for the document schemas
const DocSchemas = {
  user: {
    name: '' as string,
    user_uid: '' as string,
  },
  session: {
    user_id: '' as string,
    user_uid: '' as string,
    name: '' as string,
    token: '' as string,
    refreshToken: '' as string,
    issued: 0 as number,
    expires: 0 as number,
    provider: '' as string,
    password: '' as string,
    userDBs: {} as { [key: string]: string },
  },
} as const;

// Infer types from the schemas
export type DocTypes = keyof typeof DocSchemas;
export type DocSchema<T extends DocTypes> = (typeof DocSchemas)[T];

// Base document type
export interface BaseDocument<T extends DocTypes> {
  _id: string;
  _rev: string;
  _deleted?: boolean;
  type: T;
  createdAt: number;
  updatedAt: number;
  _attachments: Partial<Record<DocAttachmentKeys<T>, PouchDB.Core.AttachmentData>>;
}

// Create a type that combines BaseDocument with a specific schema
export type Document<T extends DocTypes> = BaseDocument<T> & DocSchema<T>;

// Infer the union type of all documents
export type AnyDocument = Document<DocTypes>;

// Helper function to create a new document
export function createDocument<T extends DocTypes>(
  type: T,
  data: Partial<Document<T>>,
): Omit<Document<T>, '_id' | '_rev'> {
  return {
    _id: data._id || generateId(type),
    type,
    createdAt: Date.now(),
    updatedAt: Date.now(),
    _attachments: {},
    ...DocSchemas[type],
    ...data,
  } as unknown as Omit<Document<T>, '_id' | '_rev'>;
}

// Define a custom alphabet for nanoid (excluding similar-looking characters)
const alphabet = '12345ABCDEF';
// Create a nanoid function with our custom alphabet
const nanoid = customAlphabet(alphabet, 8);

/**
 * Generates a unique, sortable, and type-specific ID
 * @param type The document type (must be a key of DocSchemas)
 * @returns A string ID in the format: `${type}_${YYYYMMDD}_${randomString}`
 */
function generateId<T extends keyof typeof DocSchemas>(type: T): string {
  const date = new Date();
  const dateString = date.toISOString().slice(0, 10).replace(/-/g, ''); // YYYYMMDD
  const randomPart = nanoid();
  return `${type}_${dateString}_${randomPart}`;
}

// Helper function to extract the document type from an ID
export function getDocTypeFromId(id: string): DocTypes | undefined {
  const [type] = id.split('_');
  return Object.keys(DocSchemas).includes(type) ? (type as DocTypes) : undefined;
}

// Helper function to add an attachment
export function addAttachment<T extends DocTypes>(
  doc: Document<T>,
  key: DocAttachmentKeys<T>,
  data: Blob | Buffer | string,
  contentType: string,
): Document<T> {
  return {
    ...doc,
    _attachments: {
      ...doc._attachments,
      [key]: {
        content_type: contentType,
        data: data,
      },
    },
  };
}

// Helper function to get an attachment
export function getAttachment<T extends DocTypes>(
  doc: Document<T>,
  key: DocAttachmentKeys<T>,
): PouchDB.Core.AttachmentData | undefined {
  return doc._attachments[key];
}

This reactive PouchDB store implementation offers several advantages:

  1. Reactivity: Changes to the database are automatically reflected in the observables, ensuring your UI always displays the most up-to-date data.
  2. Type safety: The use of generics and well-defined types ensures you’re always working with the correct document structures.
  3. Efficiency: By using Maps internally, lookups for individual documents are very fast (O(1) time complexity).
  4. Modularity: The separation of concerns between the store service and the document models makes the code easier to maintain and extend.
  5. Flexibility: The observable-based API integrates seamlessly with RxJS, allowing for powerful data transformations and combinations.

This approach provides a solid foundation for building complex, reactive applications with PouchDB while maintaining clean, modular code. It’s particularly well-suited for projects that require offline-first capabilities or real-time synchronization between clients.