Angular uses RxJS heavily — HttpClient, forms, and the router all return Observables. An Observable represents a stream of values over time.

Observable Basics

  import { Observable } from 'rxjs';

const stream$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

stream$.subscribe({
  next: value => console.log(value),   // 1, 2, 3
  complete: () => console.log('Done')
});
  

The $ suffix is a common convention for Observable variables.

Subscribing in Components

Always unsubscribe or use automatic cleanup to avoid memory leaks:

  import { Component, inject, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';
import { PostService } from './post.service';

@Component({ /* ... */ })
export class PostListComponent implements OnInit, OnDestroy {
  private postService = inject(PostService);
  private sub?: Subscription;

  ngOnInit() {
    this.sub = this.postService.getPosts().subscribe(posts => {
      this.posts = posts;
    });
  }

  ngOnDestroy() {
    this.sub?.unsubscribe();
  }
}
  

Modern alternative — use the async pipe in templates (auto-unsubscribes):

  @Component({
  template: `
    @for (post of posts$ | async; track post.id) {
      <p>{{ post.title }}</p>
    }
  `
})
export class PostListComponent {
  posts$ = inject(PostService).getPosts();
}
  

Import AsyncPipe for the async pipe in standalone components.

Common Operators

Operators transform, filter, or combine streams:

  import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { of } from 'rxjs';

// map — transform each value
of(1, 2, 3).pipe(map(x => x * 10)).subscribe(console.log); // 10, 20, 30

// filter — emit only matching values
of(1, 2, 3, 4).pipe(filter(x => x > 2)).subscribe(console.log); // 3, 4

// debounceTime — wait for pause (search inputs)
searchInput.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => this.api.search(term))
).subscribe(results => this.results = results);
  
Operator Purpose
map Transform each emitted value
filter Emit only values that pass a test
debounceTime Wait for a quiet period
switchMap Cancel previous inner Observable
catchError Handle errors in the stream
tap Side effects without changing data

switchMap for HTTP

Cancel in-flight requests when a new search term arrives:

  import { Subject, switchMap } from 'rxjs';

searchTerm$ = new Subject<string>();
results$ = this.searchTerm$.pipe(
  debounceTime(300),
  switchMap(term => this.http.get<Result[]>(`/api/search?q=${term}`))
);

onSearch(term: string) {
  this.searchTerm$.next(term);
}
  

Subject — Multicast Source

A Subject is both an Observable and an Observer:

  import { Subject } from 'rxjs';

private messageSource = new Subject<string>();
message$ = this.messageSource.asObservable();

sendMessage(msg: string) {
  this.messageSource.next(msg);
}
  

Use Subjects for component communication or simple event buses.

Observable vs Promise

Observable Promise
Values Multiple over time Single value
Lazy Yes — runs on subscribe Eager — runs immediately
Cancelable Yes (unsubscribe) No

HttpClient returns Observables so you can cancel requests and compose streams with operators.