Would you like to TALK about it?

Complex UI logic in Angular - Advanced search functionality with RxJS

October 20, 2020

In enterprise web applications, usually, there are data-grids with filters. In certain situations, you also need to enable polling in a specified interval. Recently, I have been working on a project that has such a requirement, however, the first implementation is an imperative implementation with manually unsubscribing from observables and using setInterval. Almost every week there was a bug that originated from this polling logic, and I’ve decided to fully refactor the code into a declarative, RxJS based solution. It was a good opportunity to write marble tests as well.

I decided to create a blog post series about certain scenarios that can be a challenge to implement. This is the first part of the series.

Parts of the series
Advanced search functionality with RxJS <— You are here.

"Powered by Angular and RxJS"

If you would like to jump straight into the implementation, you can check out my ng-reusables git repository.

The Requirements

The user stories look something like the following:


As a user, I’d like to be able to use the filter inputs to provide filter parameters to the search action.

When I click on the search button, and the request takes too long, I’d like to be able to stop the request. In this case, keep the already fetched data in the data-grid.

I would also like to have a select, in which I can choose an interval value. When I trigger a search, it searches by polling at the provided interval. During polling, if I change the filters the next poll will use those filters.

As a user, I’d like the search button to display ‘Refresh’ when there are no changes to the form.


So let’s think it through and gather the problems that we need to solve.

  1. We need a Search button which changes its text and behaviour to Stop and Refresh based on the current state.
  2. We need a polling interval select near the search button. When we change it during polling, the interval should change accordingly.
  3. Searching and Polling should only start when the search button gets interacted with. (clicking or pressing enter)
  4. Whenever we search, and the response takes too long to arrive, we need the ability to stop the request.
  5. We need to be able to cache the previous value in case an error occurs.
  6. After a successful search, the search button should display Refresh. When the filter form changes in any way, display Search.

The action streams and their triggers

First, let’s set up our template, so we can visualise it. We have two inputs in this example, a username and an email input. Then we add a select element and bind it to the pollInterval form control. All the inputs are inside a form element, which is bound to the searchForm FormGroup.

We also have a button, which gets displayed when there is a value in the searchState$ observable. This observable is going to have a default value, so the button always gets displayed. We also bind a click handler to the button. It calls triggerSearchAction() with the actual state every time it gets clicked.

<form [formGroup]="searchForm">
  <label for="username">Username</label>
  <input id="username" type="text" name="username" formControlName="username" />
  <br>
  <label for="email">email</label>
  <input id="email" type="text" name="email" formControlName="email" />
  <br>
  <div *ngIf="searchForm.get('email').hasError('email')"> Invalid e-mail address.</div>
  <br>
  <select name="pollInterval" formControlName="pollInterval">
    <option value="0">No polling</option>
    <option value="5000">5 sec</option>
    <option value="10000">10 sec</option>
  </select>

  <button *ngIf="searchState$ | async as state" (click)="triggerSearchAction(state)">
    <ng-container *ngIf="state === 'IDLE' && searchForm.dirty">Search</ng-container>
    <ng-container *ngIf="state === 'IDLE' && !searchForm.dirty">Refresh</ng-container>
    <ng-container *ngIf="state === 'SEARCH'">Stop</ng-container>
  </button>
</form>

<your-data-grid [data]="searchResults$ | async"> </your-data-grid>

We also display the different button texts in separate ng-containers. This way our button stays accessible. Whenever the state changes, only the button’s text changes. The button does not get removed from the DOM, therefore, it does not lose the focus. It is rather important for keyboard-only users. Finally, we have a data-grid component, and we bind the searchResults$ observable to it as a data source.

In our component we set up the searchForm with validators to the provided controls. In the class constructor, we also call markAsdirty() on it. This way the searchState$ observable maps to IDLE and our button displays Search. Our searchStateSub is a BehaviorSubject with an initial value of IDLE. When the searchForm is not dirty, it changes the button to display Refresh. The searchAction$ observable sets the search state based on the provided action. The triggerSearchAction() method handles this logic.

type SearchTriggers = 'START' | 'STOP'
type SearchToggleStates = 'IDLE' | 'SEARCH'

interface SearchFormValue {
  username: string
  email: string
  pollInterval: number
}

@Component({
  selector: 'search-polling-example',
  template: 'search-polling-example.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush,
})
export class SearchPollingExampleComponent {
  readonly searchForm = this.formBuilder.group({
    username: ['', []],
    email: ['', [Validators.email]],
    pollInterval: [0, []],
  })

  private searchActionSub = new Subject<SearchTriggers>();
  private searchStateSub = new BehaviorSubject<SearchToggleStates>('IDLE');

  readonly searchState$: Observable<SearchToggleStates> = this.searchStateSub.asObservable();

  private readonly searchAction$: Observable<SearchTriggers> = this.searchActionSub.asObservable().pipe(
    tap((action: SearchTriggers) => {
      this.searchStateSub.next(action === 'START' ? 'SEARCH' : 'IDLE');
    })
  );

  readonly searchResults$: Observable<User[]> = this.searchAction$
    // We need to implement the search and polling mechanism here using the 'fetchData' method.
    .pipe()

  constructor(private formBuilder: FormBuilder, private userService: UserService) {
    this.searchForm.markAsDirty()
  }

  triggerSearchAction(state: SearchToggleStates): void {
    this.searchActionSub.next(state === 'IDLE' ? 'START' : 'STOP')
  }

  fetchData(formValue: SearchFormValue): Observable<User[] | null> {
    return this.userService.fetchUsers(formValue)
    .pipe(
      catchError((error) => {
        console.error(error)
        return of(null)
      })
    )
  }
}

We also implemented the fetchData method. It accepts our searchForm’s values and fetches the data using the UserService. If the service returns with an error, we catch it, and we return a null value. This becomes important later when we need to fall back to a previously emitted value.

Planning the logic

With the initial logic set up, we need to implement the polling, and the circuit breaking logic. We do this in our searchResults$ observable pipe with some operator chaining. Since this logic is going to surface in other parts of the application as well, it is best to create a custom RxJS operator for it. Before we jump right into coding, let’s summarise our needs and think ahead. This is going to be a higher-order operator, and it needs our searchForm and the data-fetching method. It also needs to be able to set the search state. So our function needs at least 3 input parameters, and it needs to return a function.

type SearchTriggers = 'START' | 'STOP'
type SearchToggleStates = 'IDLE' | 'SEARCH'

export function searchPolling<T, F extends { pollInterval: number }>(
  searchForm: FormGroup,
  searchStateSub: Subject<SearchToggleStates> | ReplaySubject<SearchToggleStates> | BehaviorSubject<SearchToggleStates>,
  fetchData: (formValue: F) => Observable<T[] | null>
): (source$: Observable<SearchTriggers>) => Observable<T[]> {
  return (source$: Observable<SearchTriggers>) =>
    source$
      // Logic implementation comes here
      .pipe()
}

Our custom observable operator method should:

  1. emit an empty array as a default value.
  2. trigger search when the event emitted by the source is START and set the search state.
  3. trigger polling search when the provided FromGroup’s pollInterval property is not 0.
  4. stop a running polling search when the FromGroup’s pollInterval property gets set to 0.
  5. unsubscribe from fetching the data if it takes too long to arrive.
  6. emit the previously emitted value if a null value gets emitted. Either from the data retrieval function or when stopping a long request.
  7. throw an error if it is used in a not intended way.

The implementation

Default value and handling misuse

These are solid test cases. Since we are creating an operator method, we can write our tests using rxjs-marbles. Let’s start with writing the two easy tests. Namely, that our operator method does not get misused and that it emits an empty array as a default value. We use a hot observable stream as our source because the source observable emits values outside from it. It is an action-stream which we trigger with passing START and STOP into it. The expectation is a cold observable because the value comes from our operator.

describe(`SearchPolling Operator`, () => {
  let MOCK_FORM: FormGroup
  let MOCK_SUBJECT: ReplaySubject<any>

  beforeEach(() => {
    MOCK_FORM = new FormGroup({
      pollInterval: new FormControl(0),
    })
    MOCK_SUBJECT = new ReplaySubject(1)
  })

  it(`emits an empty array as a default value`, marbles((m) => {
      const source$ =   m.hot( '----')
      const expected$ = m.cold('x---', { x: [] })

      m.expect(
        source$.pipe(searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])))
      ).toBeObservable(expected$)
  }));

  it(`should throw an error if it is misused`, marbles((m) => {
    const expectedError = new Error(
      `Search Polling can only accept 'START' and 'STOP' events, but you provided 'WHATEVER'`
    )
    const source$ =   m.hot( '-a--', { a: 'WHATEVER' })
    const expected$ = m.cold('x#--', { x: [] }, expectedError)

    m.expect(
      source$.pipe(searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])))
    ).toBeObservable(expected$)
  }));
});

In our first test we can see, that if the action source never emits a value, the result still has an empty array as a value. In the second test, we start by emitting a WHATEVER value and in our output, we expect an error. So let’s update our operator to make these tests pass.

export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {

  return (source$: Observable<SearchTriggers>) =>
    source$.pipe(
      switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
        if (event === 'STOP') {
          // stop logic comes here
        } else if (event === 'START') {
          // search and search polling logic comes here
        } else {
          return throwError(
            new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
          )
        }
      }),
      startWith<T[], T[]>([])
    )
}

So we use a switchMap operator to convert our event stream into a data stream. This is important because the switchMap operator unsubscribes from the previous stream. We set up the context for the STOP and START events. Inside the else branch, we throw an error. This way we make sure that other developers who use this pipe won’t have to deal with errors coming from SOTP typos. Then we add the startWith operator with an empty array. This does exactly what we need, it starts the data stream with an empty array.


Let’s implement the single search and the stop mechanism now. We need to display the previous values in our data-grid when we stop a search. When the service returns with an error, our data-fetch method will catch it and return null. We also need to set the search state back to IDLE. Updating our button text requires us to call markAsPristine() on our search form as well.

// in our test file
it(`should trigger search when the event emitted by the source is 'START' and set the search state to 'IDLE'.`, marbles(m => {
  const source$ =      m.hot( '--a-', { a: 'START' });
  const searchState$ = m.hot( '--s-', { s: 'IDLE' });
  const expected$ =    m.cold('x-y-', { x: [], y: ['test'] });

  m.expect(
    source$.pipe(searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])))
  ).toBeObservable(expected$);
  m.expect(
    MOCK_SUBJECT.asObservable()
      .pipe(tap(_ => expect(MOCK_FORM.dirty).toBe(false)))
  ).toBeObservable(searchState$);
}));

This tests our simple search logic, so let’s implement the necessary code to make the test pass. In our else if block, we call the fetchData method with the search form’s value. We also pipe through it, setting the search state to IDLE. The tap operator’s callback called before our startWith operator calls the markAsPristine() method. It sets the search form to pristine or ‘not dirty’.

export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {
  return (source$: Observable<SearchTriggers>) =>
    source$.pipe(
      switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
        if (event === 'STOP') {
          // stop logic comes here
        } else if (event === 'START') {
          const formData: F = searchForm.getRawValue();

          if (formData.pollInterval === 0) {
            return fetchData(formData).pipe(tap((_) => searchStateSub.next('IDLE'))
          } else {
            // polling logic implementation comes here.
          }

        } else {
          return throwError(
            new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
          )
        }
      }),
      tap(_ => searchForm.markAsPristine()),
      startWith<T[], T[]>([])
    )
}

Fetching the data and setting the state and search states were pretty straightforward. However, we still need to implement our circuit break and caching logic. Let’s write our tests first.


Stop search and cache the previous value

In the first test, we simulate a slow internet connection. For that, we use the delay operator on our mocked fetchData method. In the second test, we need a more complex method for fetching the data. The mockFetchData method uses the test’s scope to increase a counter. When the counter reaches 2, it returns a null value. This simulates a network error on the third search request. The value that should get emitted must be the previous value.

// in our test file
it(`should unsubscribe from fetching the data if it takes too long to arrive and return the previously emitted value.`, marbles(m => {
  const source$ =   m.hot( '--ab', { a: 'START', b: 'STOP' });
  const expected$ = m.cold('x--x', { x: [] });

  m.expect(
    source$.pipe(
      searchPolling(
        MOCK_FORM,
        MOCK_SUBJECT,
        (formValue: any) => of(['test']).pipe(delay(200))  // simulate slow response.
      )
    )
  ).toBeObservable(expected$);
}));

it(`should emit the previous successful value, when the data fetch encounters an error and a null value is emitted`, marbles(m => {
  let counter = 0;
  const mockData = ['firstEmmit', 'secondEmmit', 'thirdEmmit']
  const mockFetchData = (formValue: any) => {
    if (counter === 2) {
      return of(null); // simulate caught error
    } else {
      const returnValue = mockData[counter]
      counter++
      return of([returnValue])
    }
  };

  const source$ =   m.hot( '--a-a-a', { a: 'START' });
  const expected$ = m.cold('x-y-z-z', { x: [], y: ['firstEmmit'], z: ['secondEmmit'] });

  m.expect(
    source$.pipe(searchPolling((MOCK_FORM, MOCK_SUBJECT, mockFetchData))
  ).toBeObservable(expected$);

}));

We’ve set the search state to IDLE in a tap operator, however, we do need to set it whenever we stop a search. Also, it is going to be needed when we search with polling. First, we extract it into a function scoped inside our operator method. Then emit a simple null value when a STOP event triggers the function. Since we are inside a switchMap operator, whenever a previous request takes too long, emitting a null value will unsubscribe from the previous stream.

export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {

  function setSearchStateToIdle<V>(): (s$: Observable<V>) => Observable<V> {
    return (s$) => s$.pipe(tap((_) => searchStateSub.next('IDLE')))
  }

  return (source$: Observable<SearchTriggers>) =>
    source$.pipe(
      switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
        if (event === 'STOP') {
          return of<T[]>(null).pipe(setSearchStateToIdle())
        } else if (event === 'START') {
          const formData: F = searchForm.getRawValue()

          if (formData.pollInterval === 0) {
            return fetchData(formData).pipe(setSearchStateToIdle())
          } else {
            // polling logic implementation comes here.
          }
        } else {
          return throwError(
            new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
          )
        }
      }),
      tap((_) => searchForm.markAsPristine()),
      startWith<T[], T[]>([]),
      scan((previous, next) => next || previous)
    )
}

The first emit always going to be an empty array. When we use the scan operator we simply check the latest emitted value. The scan operator caches previous values. If the latest value emitted is a null value, we give back the previous value. In the worst-case scenario, the previous value will always be an empty array. It also keeps the previous search results in memory.


Polling

Now we only need to implement the polling mechanism. When polling, changes in the search form should be taken into account. Whenever the pollInterval is set to zero, the polling should stop with the last request. When this occurs, the operator should unsubscribe from the search form’s valueChanges observable. After emitting the last search results, the search state is set to IDLE upon the last emission.

// in our test file
it(`should trigger polling search when the provided FromGroups pollInterval property is not 0`, marbles((m) => {
  MOCK_FORM.setValue({
    pollInterval: 2,
  });
  const source$ =   m.hot( '--a------b', { a: 'START', b: 'STOP' })
  const expected$ = m.cold('x-y-y-y-yy', { x: [], y: ['test'] })

  m.expect(
    source$.pipe(
      searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test']))
    )
  ).toBeObservable(expected$)
}));

it(`should stop a running polling search when the FromGroup's pollInterval property gets set to 0`, marbles((m) => {
  MOCK_FORM.setValue({
    pollInterval: 2,
  });
  const source$ =      m.hot( '--a-------', { a: 'START' })
  const searchState$ = m.hot( '------s--', { s: 'IDLE' })
  const expected$ =    m.cold('x-y-y-y---', { x: [], y: ['test'] })

  m.expect(
    source$.pipe(
      searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])),
      // simulate setting the formControl on the third emmit.
      scan((previous, current, index) => {
        if (index === 2) {
          MOCK_FORM.setValue({ pollInterval: 0 })
          MOCK_FORM.updateValueAndValidity()
        }
        return current
      })
    )
  ).toBeObservable(expected$)

  m.expect(MOCK_SUBJECT.asObservable()).toBeObservable(searchState$)
}));

We finally finish the implementation with the polling logic. To make the code readable, we implement it in a separate function. In this function we subscribe to searchForm.valueChanges. We use the startWith pipe again, to emit the initialFormValue parameter. Then we use the switchMap operator to implement the polling logic. If the pollInterval is 0 we must switch to a simple observable which emits once. We can imagine how problematic it would be, if 0 was passed to a timer as the time between intervals. This can only occur when a polling search is already in progress. We stop the polling here and we set the search state to IDLE.

export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {
  // ...
  function usePolling(initialFormValue: F): Observable<T[]> {
    return searchForm.valueChanges.pipe(
      startWith<F, F>(initialFormValue),
      switchMap((formValue: F) => {
        const pollSource$: Observable<number> =
          formValue.pollInterval === 0
            ? of(0).pipe(setSearchStateToIdle())
            : timer(0, formValue.pollInterval)
        return pollSource$.pipe(map<number, F>((_) => formValue))
      }),
      switchMap(fetchData),
      takeWhile((_) => searchForm.getRawValue().pollInterval !== 0)
    )
  }
  // ...
}

When the pollInterval is greater than 0, we set a timer. Our pollSource$ observable would emit only numbers, so we map it back to the formValue. We use this to fetch the data in the next switchMap operator. Notice, that we don’t set the search state to IDLE here because we need to display the text Stop on our button. And last, we unsubscribe from the valueChanges observable using the takeWhile operator. The takeWhile operator completes a stream when the provided method returns a falsy value. This enables us to prevent triggering simple searches with changing form values. This would occur after stopping the polling by setting the pollInterval to 0.

type SearchTriggers = 'START' | 'STOP'
type SearchToggleStates = 'IDLE' | 'SEARCH'

export function searchPolling<T, F extends { pollInterval: number }>(
  searchForm: FormGroup,
  searchStateSub: Subject<SearchToggleStates> | ReplaySubject<SearchToggleStates> | BehaviorSubject<SearchToggleStates>,
  fetchData: (formValue: F) => Observable<T[] | null>
): (source$: Observable<SearchTriggers>) => Observable<T[]> {
  function setSearchStateToIdle<V>(): (s$: Observable<V>) => Observable<V> {
    return (s$) => s$.pipe(tap((_) => searchStateSub.next('IDLE')))
  }

  function usePolling(initialFormValue: F): Observable<T[]> {
    return searchForm.valueChanges.pipe(
      startWith<F, F>(initialFormValue),
      switchMap((formValue: F) => {
        const pollSource$: Observable<number> =
          formValue.pollInterval === 0
            ? of(0).pipe(setSearchStateToIdle())
            : timer(0, formValue.pollInterval)
        return pollSource$.pipe(map<number, F>((_) => formValue))
      }),
      switchMap(fetchData),
      takeWhile((_) => searchForm.getRawValue().pollInterval !== 0)
    )
  }

  return (source$: Observable<SearchTriggers>) =>
    source$.pipe(
      switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
        if (event === 'STOP') {
          return of<T[]>(null).pipe(setSearchStateToIdle())
        } else if (event === 'START') {
          const formData: F = searchForm.getRawValue()

          return formData.pollInterval === 0
            ? fetchData(formData).pipe(setSearchStateToIdle())
            : usePolling(formData)
        } else {
          return throwError(
            new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
          )
        }
      }),
      tap((_) => searchForm.markAsPristine()),
      startWith<T[], T[]>([]),
      scan((previous, next) => next || previous)
    )
}

And inside our component, we can update the searchResults$ observable.

export class SearchPollingExampleComponent {
  //...
  readonly searchResults$: Observable<User[]> = this.searchAction$.pipe(
    searchPolling<User, SearchFormValue>(this.searchForm, this.searchStateSub, this.fetchData.bind(this))
  )
  //...
}

Conclusion

We have implemented a rather complex logic declaratively, using RxJS. I hope this coding walkthrough will help you understand how to use some RxJS operators. If this article helped you in any way, please shout out to me on Twitter or share it.


Balázs Tápai

Written by Balázs Tápai.
I will make you believe that I'm secretly three senior engineers in a trench-coat. I overcome complex issues with ease and complete tasks at an impressive speed. I consistently guide teams to achieve their milestones and goals. I have a comprehensive skill set spanning from requirements gathering to front-end, back-end, pipelines, and deployments. I do my best to continuously grow and increase my capabilities.

You can follow me on Twitter or Github.