Redux-saga vs Redux-observale
前言
之前工作中一直使用的 redux-saga 来处理非常复杂的异步场景, 在接触到 Rxjs 后马上就被这种编程方式所吸引。它通过使用 observable 序列搭配强大的多种操作符来处理各种异步和事件。
Think of RxJS as Lodash for events.
这里我们罗列一下常见的场景,来比较一下这两者的写法。
Metal Model
这两者的模式有着本质的区别:
Saga = Worker + Watcher
import API from '...'
function* Watcher(){
yield takeEvery('do_thing', Worker)
}
function* Worker() {
const users = yield API.get('/api/users')
yield put({type:'done', users})
}
Rxjs = Epic( Type + Operators )
import API from '...'
const Epic = action$ =>
action$
.ofType('do_thing')
.flatMap(()=>API.get('/api/users'))
.map(users=>({type:'done', users}))
场景比较
我们可以从一些最简单的场景开始。
Fetch User from /api/users/1
Saga:
import axios from 'axios'
function* watchSaga(){
yield takeEvery('fetch_user', fetchUser) // waiting for action (fetch_user)
}
function* fetchUser(action){
try {
yield put({type:'fetch_user_ing'})
const response = yield call(axios.get,'/api/users/1')
yield put({type:'fetch_user_done',user:response.data})
} catch (error) {
yield put({type:'fetch_user_error',error})
}
}
Rx:
import axios from 'axios'
const fetchUserEpic = action$ =>
action$
.ofType('fetch_user')
.map(()=>({type:'fetch_user_ing'}))
.flatMap(()=>axios.get('/api/users/1'))
.map(response=>response.data)
.map(user=>({type:'fetch_user_done', user}))
Fetch User from /api/users/1
(cancelable)
Saga:
import { take, put, call, fork, cancel } from 'redux-saga/effects'
import API from '...'
function* fetchUser() {
yield put({type:'fetch_user_ing'})
const user = yield call(API)
yield put({type:'fetch_user_done', user})
}
function* Watcher() {
while(yield take('fetch_user')){
const bgSyncTask = yield fork(fetchUser)
yield take('fetch_user_cancel')
yield cancel(bgSyncTask)
}
}
Rx:
const fetchUserEpic = action$ =>
actions$
.ofType('fetch_user')
.map(()=>({type:'fetch_user_ing'}))
.flatMap(()=>{
return Observable
.ajax
.get('/api/user/1')
.map(user => ({ type: 'fetch_user_done', user }))
.takeUntil(action$.ofType('fetch_user_cancel'))
})
连续执行序列
Saga:
function* worker1() { ... }
function* worker2() { ... }
function* worker3() { ... }
function* watcher() {
const score1 = yield* worker1()
yield put(({type:'show_score', score1})
const score2 = yield* worker2()
yield put(({type:'show_score', score2})
const score3 = yield* worker3()
yield put(({type:'show_score', score3})
}
Rx:
const score1Epic = action$ =>
action$
.ofType('score1')
.reduce(worker1)
.flatMap(score=>{
return Observable.merge(
{type:'show_score', score},
{type:'score2'}
)
})
const score2Epic = action$ =>
action$
.ofType('score2')
.reduce(worker2)
.flatMap(score=>{
return Observable.merge(
{type:'show_score', score},
{type:'score3'}
)
})
const score3Epic = action$ =>
action$
.ofType('score3')
.reduce(worker3)
.map(score=>({type:'show_score', score}))
Login, token, Logout, Cancel
with redux
const store = {
token: null,
isFetching: false
}
const tokenReducer = (state=null, action) => {
switch (action.type) {
case 'login_success':
return action.token
case 'login_error':
case 'login_cancel':
case 'logout'
return null
default:
return state
}
}
const isFetching = (state=false, action) => {
switch (action.type) {
case 'login_request':
return true
case 'login_success':
case 'login_error':
case 'login_cancel':
case 'logout'
return false
default:
return state
}
}
Saga
import { take, put, call, fork, cancel } from 'redux-saga/effects'
import Api from '...'
function* loginWatcher() {
const { user, password } = yield take('login_request')
, task = yield fork(authorize, user, password)
, action = yield take(['logout', 'login_error'])
if (action.type === 'logout') {
yield cancel(task)
yield put({type:'login_cancel'})
}
}
function* authorize(user, password) {
try {
const token = yield call(Api.getUserToken, user, password)
yield put({type: 'login_success', token})
} catch (error) {
yield put({type: 'login_error', error})
}
}
Rxjs
const authEpic = action$ =>
action$
.ofType('login_request')
.flatMap(({payload:{user,password}}) =>
Observable
.ajax
.get('/api/userToken', { user, password })
.map(({token}) => ({ type: 'login_success', token }))
.takeUntil(action$.ofType('login_cancel', 'logout'))
.catch(error => Rx.Observable.of({type:'login_error', error}))
logger
// ----- Saga ----- \\
while (true) {
const action = yield take('*')
, state = yield select()
console.log('action:', action)
console.log('state:', state)
}
// ----- Rxjs ----- \\
.do(value=>console.log(value))
Take latest request
// ----- Saga ----- \\
takeLatest()
// ----- Rxjs ----- \\
.switchMap()
Retry with delay (1000ms)
// ----- Saga ----- \\
... still thinking about it
// ----- Rxjs ----- \\
.retryWhen(errors=>{
return errors.delay(1000).scan((errorCount, err)=>{
if(errorCount < 3) return errorCount + 1
throw err
}, 0)
})
Error Handling
// ----- Saga ----- \\
try {
// ... do things
} catch (error) {
yield put({ type:'fetch_user_error',error })
}
// ----- Rxjs ----- \\
.catch(error => Observable.of({ type:'fetch_user_error', error }))
并发执行
// ----- Saga ----- \\
import { call } from 'redux-saga/effects'
const [users, repos] = yield [
call(fetch, '/users'),
call(fetch, '/repos')
]
// ----- Rxjs ----- \\
.flatMap(()=>{
return Observable.merge(promiseA, promiseB)
})
Throttling, Debouncing, Retrying
Saga
// ---- Throttling ---- \\
yield throttle(500, 'input_change', fun)
// ---- Debouncing ---- \\
yield call(delay, 500)
// ---- Retrying ---- \\
function* retryAPI(data) {
for(let i = 0; i < 3; i++) {
try {
const apiResponse = yield call(apiRequest, { data });
return apiResponse;
} catch(err) {
if(i < 3) {
yield call(delay, 2000);
}
}
}
throw new Error('API request failed');
}
Rx
// ---- Throttling ---- \\
.throtleTime(1000)
// ---- Debouncing ---- \\
.debouncing(1000)
// ---- Retrying ---- \\
.retry(3)