RxJava应用-过滤一定时间内的重复输入源
文/ BlackSwift(简书作者)
原文链接:http://www.jianshu.com/p/94a1a016a461
简单的一个RxJava操作符的笔记
需求
过滤一定时间内重复的输入源。
举个例子,许多用户同时对服务器进行投票请求,为了防止刷票,我希望实现24小时内过滤重复IP的投票。
再举例一枚,用户在APP中下拉刷新时,可能无意识进行了多次刷新,但是这样大多数情况是低效的,我只希望获得在3秒内刷新动画中的首次请求。
如果用传统方法实现,可能一堆if与全局变量嵌在代码中,非常丑陋,既然RxJava是面向事件编程的方法,为何不将其转为管道操作呢?
实现
自己写一个操作符,它实现RxJava中的接口Observable.Operator
代码主要参考了distinct与debounce。
public class DistinctWithTimeout<T, U> implements Observable.Operator<T, T> {
//调度器,建议用主线程
final Scheduler scheduler;
//与Distinct中的select含义相同,用于判断两个时间是否重复
final Func1<? super T, ? extends U> selector;
final TimeUnit timeUnit;
final long timeInMilliseconds;
//被过滤时的回调
final Action1<T> action1;
public DistinctWithTimeout(long timeInMilliseconds, TimeUnit timeUnit,
Func1<? super T, ? extends U> selector, Scheduler scheduler, Action1<T> duplicateAction) {
this.timeUnit = timeUnit;
this.selector = selector;
this.action1 = duplicateAction;
this.timeInMilliseconds = timeInMilliseconds;
this.scheduler = scheduler;
}
@Override public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
return new Subscriber<T>() {
private Map<U, Long> lastMap = new WeakHashMap<>();
@Override public void onCompleted() {
subscriber.onCompleted();
}
@Override public void onError(Throwable e) {
subscriber.onError(e);
}
@Override public void onNext(T t) {
long now = scheduler.now();
U u = selector.call(t);
//解开装箱
long last = lastMap.get(u) == null ? (0) : (lastMap.get(u));
//如果没有超时
if (now - last >= timeUnit.toMillis(timeInMilliseconds)) {
lastMap.put(u, now);
subscriber.onNext(t);
} else {
//调用超时后的Action
action1.call(t);
}
}
};
}
}
本方法仅仅用于并发量不大的请求,比如界面编程中使用。如果是长间隔、大对象下,Map可能会内存爆炸的。在服务端真正使用的话可以用Redis替换掉上文的Map,比如它的expire属性。
举例
以过滤投票为例,首先确认了超时时间,接着确定了如何去验证请求是否重复,此步骤进行了一个同步的查询数据库请求。
RxBus.getInstance().observe()
.lift(
new DistinctWithTimeout<>(
24, TimeUnit.HOURS, /*超时时间*/
requst -> DatabeseHelper.getInstance().get(requst),/*验证重复的方法*/
Schedulers.io(),/*处理此操作符的线程池*/
requst -> showToast("在1天内不需要重复发送"))/*如果重复,返回的错误消息*/)
.doOnNext(...);
在Android中,下面是去掉重复请求事件的例子
EventBus.<SMSRequst>getInstance().observe()
.lift(
new DistinctWithTimeout<>(
10, TimeUnit.SECONDS,
SMSRequst::getMessage,
AndroidSchedulers.mainThread(),
requst -> showToast("在10秒内不需要重复发送")))
总结
-
函数式编程在面向事件的开发中使用起来非常爽。
-
多看RxJava的操作符源码,可以秒杀很多笔试题。
来自:RxJava