About Me
Sam Lee (misgod.tw@gmail.com)
A software engineer lives in Tainan
Work for htc
Most used Clojure, Scala and Java
Interested in Functional Programming
Interested in Machine Learning and Data Analyst
Sam Lee
2014/11/22 @Mosut x Taina.py x FP
Sam Lee (misgod.tw@gmail.com)
A software engineer lives in Tainan
Work for htc
Most used Clojure, Scala and Java
Interested in Functional Programming
Interested in Machine Learning and Data Analyst
A style of programming based on two key ideas: continuous time-varying behaviors, and event-based reactivity
aObservable.map(x -> x*x) //Square
.reduce((a, b) -> a+b) //Sum
.subscribe(x -> println(x)); //Show
You can transform & compose asynchronous operations.
High-level abstractions
Standard error handling
https://github.com/ReactiveX/RxJava
RxJava is a JVM implementation of Reactive Extensions
RxJava extends Observer pattern to support data/event and compose operators in abstract.
Built by Netflix
Support Java 6+ & Android 2.3+
Java 8 lambda support
single items | multiple items | |
---|---|---|
synchronous | T getData() | Iterable<T> getData() |
asynchronous | Future<T> getData() |
event | Iterable (pull) | Observable (push) |
---|---|---|
retrieve data | T next() | onNext(T) |
discover error | throws Exception | onError(Exception) |
complete | !hasNext() | onCompleted() |
http://csl.stanford.edu/~christos/pldi2010.fit/meijer.duality.pdf
pushman.subscribe(new Action1<Integer>() {
@Override
public void call(Integer x) {
println("receive: " + x);
}
});
image from http://slides.com/yaroslavheriatovych/frponandroid
image from http://slides.com/yaroslavheriatovych/frponandroid
aObservable.filter(new Func1<Integer, Boolean>() {
public Boolean call(Integer n) {
return n % 2 == 0;
}
})
.map(new Func1<Integer, Integer>() {
public Integer call(Integer n) {
return n * n;
}
})
.subscribe(new Action1<Integer>() {
public void call(Integer n) {
System.out.println(n);
}
});
WTF...
aObservable.filter(n -> n % 2 == 0)
.map(n -> n * n)
.subscribe(System.out::println);
but ... no java8 on Davilk(Android)
Retrolambda
Gradle plugin
Observable<List<String>> ob = Observable.just(aList);
Observable<String> ob2 = Observable.just("Some String");
List<String> aList = ...;
Observable<String> ob = Observable.from(aList);
ob = Observable.create(subscriber -> {
try {
for (String s : aList) {
if (subscriber.isUnsubscribed())
return;
subscriber.onNext(s);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
});
repeat( )
range( )
interval( )
timer( )
empty( )
error( )
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables
Observable.range(0, 5)
.map(x -> toBinaryString(x*x))
.subscribe(s -> println(s),
err -> err.printStackTrace(),
() -> println("done"));
0
1
100
1001
10000
done
Observable.range(1, 3)
.flatMap(x -> Observable.just(x).repeat(x))
.subscribe(System.out::println);
1
2
2
3
3
3
Observable is a Monad
- unit (return) ==> just
- join (bind, >>=) ==> flatmap
Observable.range(0, 10)
.filter(x -> (x % 2) == 0)
.subscribe(System.out::println);
0
2
4
6
8
https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables
Observable.range(1, 10)
.reduce((a, b) -> a*b)
.subscribe(System.out::println);
3628800
Observable<String> lower = Observable.from(new String[]{"a", "b", "c"});
Observable<String> upper = Observable.from(new String[]{"A", "B", "C"});
Observable.merge(lower,upper).subscribe(System.out::println);
/* Or */
lower.mergeWith(upper).subscribe(System.out::println);
a
b
A
c
B
C
Observable<String> lower = Observable.from(new String[]{"a", "b", "c"});
Observable<String> upper = Observable.from(new String[]{"A", "B", "C"});
lower.startWith(upper).subscribe(System.out::println);
A
B
C
a
b
c
Observable<String> lower = Observable.from(new String[]{"a", "b", "c"});
Observable<String> upper = Observable.from(new String[]{"A", "B", "C"});
Observable.zip(lower, upper, Pair::create)
.map(pair -> pair.first +"_" +pair.second)
.subscribe(System.out::println);
a_A
b_B
c_C
OnErrorResumeNext
OnErrorReturn
retry
https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
specify on which Scheduler a Subscriber should observe the Observable
specify which Scheduler an Observable should use when its subscription is invoked
PublishSubject
BehaviorSubject
AsyncSubject
ReplaySubject
Subscriber -----> Observable
PublishSubject<Integer> subject = PublishSubject.create();
//Observable
subject.map(x -> x*x)
.subscribe(o -> println(o));
subject.map(x -> x+1)
.subscribe(o -> println(o));
//Subscriber
subject.onNext(11);
subject.onNext(11);
subject.onCompleted();
Developing a robust app is painful
Developing a good UX app is painful
Thread, Executor, Handler, AsyncTask, Loader ...
RxJava can mitigate your pain
Retrofit supports methods with a return type of rx.Observable
@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);
Android specific bindings for RxJava.
Scheduler on main UI thread or a given Android Handler thread.
Reactive components for common Android use cases and UI widgets
/* API */
void getFromServer(String key, Action1<String> callback);
void getFromDB(String key, Action1<String> callback);
/* Code */
btnClick.setOnClickListener(new View.OnClickListener() {
public void onClick(View view) {
getFromDB("myid", new Action1<String>() {
public void call(String s) {
getFromServer(s, new Action1<String>() {
public void call(final String s) {
runOnUiThread(new Runnable() {
public void run() {
Toast.makeText(context, s, LENGTH_LONG).show();
}
});
/* ... a lot of }) ... */
btnClick.setOnClickListener(view ->
getFromDB("myid",
s -> getFromServer(s,
x -> runOnUiThread(
() -> Toast.makeText(context, x, LENGTH_LONG).show()))));
Shorter but not easy to read
/* API */
Observable<String> getFromServer(String key);
Observable<String> getFromDB(String key);
/* Code */
ViewObservable.clicks(btnClick)
.map(x -> "myid")
.observeOn(Schedulers.io())
.flatMap(this::getFromDB)
.flatMap(this::getFromServer)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> Toast.makeText(context, x, LENGTH_LONG).show());
Observable<SearchResult> g = googleSearch.search(keyword).retry(3);
Observable<SearchResult> b = bingSearch.search(keyword).retry(3);
Observable<SearchResult> y = yahooSearch.search(keyword).retry(3);
Observable.merge(g, b, y)
.distinct(site -> site.url)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(site -> appendDataForUI() ,
err -> errorhandle(err));
PublishSubject<Object> subject = PublishSubject.create(); //Global Singleton
//...In Class A...
subject.filter(x -> x instanceof DataUpdateAction)
.subscribe( x -> ... doSomething ...);
//...In Class B...
subject.filter(x -> x instanceof DeleteAction)
.subscribe( x -> ... doSomething ...);
//...In Class C...
subject.filter(x -> x instanceof RefreshAction)
.subscribe( x -> ... doSomething ...);
subject.onNext(aDataUpdateAction);
subject.onNext(aDataUpdateAction;
subject.onNext(aRefreshAction);
private Observable<String> getSuggestion(String prefix) { ... }
Observable<Observable<List<String>>> o =
ViewObservable.text(aEditText)
.map(event -> event.text.toString())
.filter(x -> x.length() > 1)
.observeOn(Schedulers.io())
.map(x -> getSuggestion(x).toList());
Observable.switchOnNext(o)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(lst -> showSuggestion(lst));