Back-end
9 minute read

Building Reactive Apps with Redux, RxJS, and Redux-Observable in React Native

Sourabh has a broad and versatile skill set. His experience with mobile and back-end development enables him to build prototypes quickly.

In the growing ecosystem of rich and powerful web and mobile apps, there’s more and more state to be managed, like current user, list of items loaded, loading state, errors, and much more. Redux is one solution to this problem by keeping the state in a global object.

One of the limitations of Redux is that it doesn’t support asynchronous behavior out of the box. One solution for this is redux-observable, which is based on RxJS, a powerful library for reactive programming in JavaScript. RxJS is an implementation of ReactiveX, an API for reactive programming which originated at Microsoft. ReactiveX combines some of the most powerful features of the reactive paradigm, functional programming, the observer pattern and the iterator pattern.

In this tutorial, we’ll learn about Redux and its usage with React. We’ll also explore reactive programming using RxJS and how it can make tedious and complex asynchronous work very simple.

Finally, we’ll learn redux-observable, a library that leverages RxJS to do asynchronous work, and will then build an application in React Native using Redux and redux-observable.

Redux

As it describes itself on GitHub, Redux is “a predictable state container for JavaScript apps.” It provides your JavaScript apps with a global state, keeping state and actions away from React components.

In a typical React application without Redux, we have to pass data from the root node to children via properties, or props. This flow of data is manageable for small applications but can get really complex as your application grows. Redux allows us to have components independent of each other, thus we can use it as a single source of truth.

Redux can be used in React using react-redux, which provides bindings for React components to read data from Redux and dispatch actions to update the Redux state.

Redux

Redux can be described as three simple principles:

1. Single Source of Truth

The state of your whole application is stored in a single object. This object in Redux is held by a store. There should be a single store in any Redux app.


» console.log(store.getState())
« { user: {...}, todos: {...} }

To read data from Redux in your React component, we use the connect function from react-redux. connect takes four arguments, all of which are optional. For now, we’ll focus on the first one, called mapStateToProps.


/* UserTile.js */

import { connect } from 'react-redux';

class UserTile extends React.Component {
    render() {
        return <p>{ this.props.user.name }</p>
    }
}
function mapStateToProps(state) {
    return { user: state.user }
}

export default connect(mapStateToProps)(UserTile)

In the example above, mapStateToProps receives the global Redux state as its first argument and returns an object that will be merged with the props passed to <UserTile /> by its parent component.

2. State Is Read-Only

Redux state is read-only for React components, and the only way to change the state is to emit an action. An action is a plain object that represents an intention to change the state. Every action object must have a type field, and the value must be a string. Other than that, the contents of the action are totally up to you, but most apps follow a flux-standard-action format, which limits the structure of an action to only four keys:

  1. type Any string identifier for an action. Every action must have a unique action.
  2. payload Optional data for any action. It can be of any time and contains information about the action.
  3. error Any optional boolean property set to true if the action represents an error. This is analogous to a rejected Promise. string identifier for an action. Every action must have a unique action. By convention, when error is true, the payload should be an error object.
  4. meta Meta can be any type of value. It is intended for any extra information that is not part of the payload.

Here are two examples of actions:


store.dispatch({
    type: 'GET_USER',
    payload: '21',
});

store.dispatch({
    type: 'GET_USER_SUCCESS',
    payload: {
        user: {
            id: '21',
            name: 'Foo'
        }
    }
});

3. State Is Changed with Pure Functions

The global Redux state is changed using pure functions called reducers. A reducer takes the previous state and action and returns the next state. The reducer creates a new state object instead of mutating the existing one. Depending on the app size, a Redux store can have a single reducer or multiple reducers.


/* store.js */

import { combineReducers, createStore } from 'redux'

function user(state = {}, action) {
  switch (action.type) {
    case 'GET_USER_SUCCESS':
      return action.payload.user
    default:
      return state
  }
}

function todos(state = [], action) {
  switch (action.type) {
    case 'ADD_TODO_SUCCESS':
      return [
        ...state,
        {
          id: uuid(), // a random uuid generator function
          text: action.text,
          completed: false
        }
      ]
    case 'COMPLETE_TODO_SUCCESS':
      return state.map(todo => {
        if (todo.id === action.id) {
          return {
            ...todo,
            completed: true
          }
        }
        return todo
      })
    default:
      return state
  }
}
const rootReducer = combineReducers({ user, todos })
const store = createStore(rootReducer)

Similar to reading from the state, we can use a connect function to dispatch actions.


/* UserProfile.js */
class Profile extends React.Component {
    handleSave(user) {
        this.props.updateUser(user);
    }
}

function mapDispatchToProps(dispatch) {
    return ({
        updateUser: (user) => dispatch({
            type: 'GET_USER_SUCCESS',
            user,
        }),
    })
}

export default connect(mapStateToProps, mapDispatchToProps)(Profile);

RxJS

RxJS

Reactive Programming

Reactive programming is a declarative programming paradigm that deals with the flow of data in “streams” and with its propagation and changes. RxJS, a library for reactive programming in JavaScript, has a concept of observables, which are streams of data that an observer can subscribe to, and this observer is delivered data over time.

An observer of an observable is an object with three functions: next, error, and complete. All of these functions are optional.


observable.subscribe({
  next: value => console.log(`Value is ${value}`),
  error: err => console.log(err),
  complete: () => console.log(`Completed`),
})

The .subscribe function can also have three functions instead of an object.


observable.subscribe(
  value => console.log(`Value is ${value}`),
  err => console.log(err),
  () => console.log(`Completed`)
)

We can create a new observable by creating an object of an observable, passing in a function that receives a subscriber aka observer. The subscriber has three methods: next, error, and complete. The subscriber can call next with a value as many times as required, and complete or error in the end. After calling complete or error, the observable will not push any value down the stream.


import { Observable } from 'rxjs'
const observable$ = new Observable(function subscribe(subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next('hi');
    subscriber.complete()
    clearInterval(intervalId);
  }, 1000);
});
observable$.subscribe(
  value => console.log(`Value is ${value}`),
  err => console.log(err)
)

The example above will print Value is hi after 1000 milliseconds.

Creating an observable manually every time can become verbose and tedious. Therefore, RxJS has many functions to create an observable. Some of the most commonly used ones are of, from, and ajax.

of

of takes a sequence of values and converts it into a stream:


import { of } from 'rxjs'
of(1, 2, 3, 'Hello', 'World').subscribe(value => console.log(value))
// 1 2 3 Hello World

from

from converts almost anything into a stream of values:


import { from } from 'rxjs'


from([1, 2, 3]).subscribe(console.log)
// 1 2 3


from(new Promise.resolve('Hello World')).subscribe(console.log)
// 'Hello World'


from(fibonacciGenerator).subscribe(console.log)
// 1 1 2 3 5 8 13 21 ...

ajax

ajax takes a string URL or creates an observable that makes an HTTP request. ajax has a function ajax.getJSON, which returns only the nested response object from AJAX call without any other properties returned by ajax():


import { ajax } from 'rxjs/ajax'

ajax('https://jsonplaceholder.typicode.com/todos/1').subscribe(console.log)
// {request, response: {userId, id, title, completed}, responseType, status}


ajax.getJSON('https://jsonplaceholder.typicode.com/todos/1').subscribe(console.log)
// {userId, id, title, completed}


ajax({ url, method, headers, body }).subscribe(console.log)
// {...}

There are many more ways to make an observable (you can see the full list here).

Operators

Operators are a real powerhouse of RxJS, which has an operator for almost anything that you will need. Since RxJS 6, operators are not methods on the observable object but pure functions applied on the observable using a .pipe method.

map

map takes a single argument function and applies a projection on each element in the stream:


import { of } from 'rxjs'
import { map } from 'rxjs/operators'

of(1, 2, 3, 4, 5).pipe(
  map(i=> i * 2)
).subscribe(console.log)
// 2, 4, 6, 8, 10

Map

filter

filter takes a single argument and removes values from the stream which return false for the given function:


import { of } from 'rxjs'
import { map, filter } from 'rxjs/operators'

of(1, 2, 3, 4, 5).pipe(
  map(i => i * i),
  filter(i => i % 2 === 0)
).subscribe(console.log)
// 4, 16

Filter

flatMap

flatMap operator takes a function that maps every item in the steam into another stream and flattens all the values of these streams:


import { of } from 'rxjs'
import { ajax } from 'rxjs/ajax'
import { flatMap } from 'rxjs/operators'

of(1, 2, 3).pipe(
  flatMap(page => ajax.toJSON(`https://example.com/blog?size=2&page=${page}`)),
).subscribe(console.log)
// [ { blog 1 }, { blog 2 }, { blog 3 }, { blog 4 }, { blog 5 }, { blog 6 } ]

FlatMap

merge

merge merges items from two streams in the order in which they arrive:


import { interval, merge } from 'rxjs'
import { pipe, take, mapTo } from 'rxjs/operators'

merge(
  interval(150).pipe(take(5), mapTo('A')),
  interval(250).pipe(take(5), mapTo('B'))
).subscribe(console.log)
// A B A A B A A B B B

Merge

A full list of operators is available here.

Redux-Observable

Redux-Observable

By design, all actions in Redux are synchronous. Redux-observable is a middleware for Redux that uses observable streams to perform asynchronous work and then dispatch another action in Redux with the result of that asynchronous work.

Redux-observable is based around the idea of Epics. An epic is a function that takes a stream of actions, and optionally a stream of state and returns a stream of actions.

function (action$: Observable, state$: StateObservable): Observable;

By convention, every variable which is a stream (_aka _observable) ends with a $. Before we can use redux-observable, we have to add it as a middleware in our store. Since epics are streams of observables and every action exiting this steam is piped back into the stream, returning the same action will result in an infinite loop.


const epic = action$ => action$.pipe(
    filter(action => action.type === 'FOO'),
    mapTo({ type: 'BAR' }) // not changing the type of action returned
                           // will also result in an infinite loop
)
// or
import { ofType } from 'redux-observable'
const epic = action$ => action$.pipe(
    ofType('FOO'),
    mapTo({ type: BAZ' })
)

Think of this reactive architecture as a system of pipes where every pipe’s output feeds back into every pipe, including itself, and also into Redux’s reducers. It’s the filters on top of these pipes that decide what goes in and what is blocked.

Let’s see how a Ping-Pong epic would work. It takes a ping, sends it to the server—and after the request completes—sends a pong back to the app.


const pingEpic = action$ => action$.pipe(
    ofType('PING'),
    flatMap(action => ajax('https://example.com/pinger')),
    mapTo({ type: 'PONG' })
)

Now, we are going to update our original todo store by adding epics and retrieving users.

import { combineReducers, createStore } from 'redux'
import { ofType, combineEpics, createEpicMiddleware } from 'redux-observable';
import { map, flatMap } from 'rxjs/operators'
import { ajax } from 'rxjs/ajax'


// ...
/* user and todos reducers defined as above */
const rootReducer = combineReducers({ user, todos }) 

const epicMiddleware = createEpicMiddleware();
const userEpic = action$ => action$.pipe(
    ofType('GET_USER'),
    flatMap(() => ajax.getJSON('https://foo.bar.com/get-user')),
    map(user => ({ type: 'GET_USER_SUCCESS', payload: user }))
)

const addTodoEpic = action$ => action$.pipe(
    ofType('ADD_TODO'),
    flatMap(action => ajax({
        url: 'https://foo.bar.com/add-todo',
        method: 'POST',
        body: { text: action.payload }
    })),
    map(data => data.response),
    map(todo => ({ type: 'ADD_TODO_SUCCESS', payload: todo }))
)
const completeTodoEpic = action$ => action$.pipe(
    ofType('COMPLETE_TODO'),
    flatMap(action => ajax({
        url: 'https://foo.bar.com/complete-todo',
        method: 'POST',
        body: { id: action.payload }
    })),
    map(data => data.response),
    map(todo => ({ type: 'COMPLEE_TODO_SUCCESS', payload: todo }))
)

const rootEpic = combineEpics(userEpic, addTodoEpic, completeTodoEpic)
const store = createStore(rootReducer, applyMiddleware(epicMiddleware))
epicMiddleware.run(rootEpic);

_Important: Epics are just like any other observable streams in RxJS. They can end up in a complete or error state. After this state, the epic—and your app—will stop working. So you must catch every potential error in the steam. You can use the __catchError__ operator for this. More information: Error Handling in redux-observable.

A Reactive Todo App

With some UI added, a (minimal) demo app looks something like this:

A Reactive Todo App
The source code for this app is available on Github. Try out the project on Expo or scan the QR code above in the Expo app.

A React, Redux, and RxJS Tutorial Summarized

We learned what reactive apps are. We also learned about Redux, RxJS, and redux-observable, and even created a reactive Todo app in Expo with React Native. For React and React Native developers, the current trends offer some very powerful state management options.

Once again, the source code for this app is on GitHub. Feel free to share your thoughts on state management for reactive apps in the comments below.

Understanding the basics

What is Redux?

Redux is a predictable, global state container for JavaScript apps. It abstracts the state management and the change propagation logic away from the application's React components.

What is reactive programming?

Reactive programming is a declarative programming paradigm that deals with data as a stream and the propagation of change of state through that stream.

What is ReactiveX?

ReactiveX is an API for asynchronous programming with observable streams. It combines the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

What is RxJS?

RxJS is an implementation library of ReactiveX API in JavaScript. It leverages the dynamic nature of JavaScript along with a vast and extensive ReactiveX API.

Who created ReactiveX and RxJS?

Erik Meijer, a computer scientist at Microsoft, created ReactiveX API and its implementation in .NET. RxJS was created as a port of Rx.NET by various engineers.

Is RxJS worth learning?

RxJS provides a vast API for handling asynchronous tasks with functional programming, which facilitates the use of actions like retry, throttling, exponential fallbacks, caching, error handling, and more.

What is redux-observable?

redux-observable is a library for handling asynchronous tasks in Redux. It works by capturing a dispatched action from Redux and does some asynchronous work on it. After the work is complete, `redux-observable` adds the response into the Redux state container by dispatching another action.

Can we use RxJS without React and Redux?

RxJS is an implementation of ReactiveX and can be used with any state management or UI library, or even with vanilla JavaScript. It provides a powerful API to manage async work in the form of observable streams. These streams can be connected to any other library or used directly. Outside JavaScript, you can find implementations of ReactiveX for common languages on reactivex.io.