パルカワ2

最近はFlutterをやっています

複数のデータソースから値を取り出すのをRxJavaのconcatで実現する場合のエラー処理

blog.danlew.net

複数のデータソースから値を取り出すのをRxのconcatで実現するという記事を見て、なるほど!!と思った。ただ、エラーになった場合どうなるのかわからなかったので確認した。

以下のようなコードの場合は、localSourceのErrorがonErrorにきてremoteSourceのErrorはこない。(というか、そもそもremoteSourceが実行されない)

Single<String> localSource = Single.create(emitter -> {
    emitter.onError(new RuntimeException("1個目のError"));
});

Single<String> remoteSource = create(emitter -> {
    emitter.onError(new RuntimeException("2個目のError"));
});

Single.concat(localSource, remoteSource).subscribe(string -> Log.i("TEST", string), Throwable::printStackTrace);

localSourceの読み込みに失敗したら、remoteSourceのデータを使いたいと思ったので、onErrorReturnとfilterでどうにか出来ないかと思った。これでremoteSourceのErrorがonErrorにくるようになる。ちなみに空文字なのは、RxJavaはnullを許容しないからです。

Single.concat(localSource.onErrorReturn(__ -> ""), remoteSource)
  .filter(""::equals)
  .subscribe(string -> Log.i("TEST", string), Throwable::printStackTrace);

またはObservableにしてempty()を使うというのがコード自体は長くなるがfilterしなくていい(変な仕様を作らなくてよい)ので良い気がした。

Observable.concat(localSource.toObservable().onErrorResumeNext(Observable.empty()), remoteSource.toObservable())
                .first("default")
                .subscribe(string -> Log.i("TEST", string), Throwable::printStackTrace);

もっといい方法ありそう!!! あったら教えてください

追記:
教えて頂いた!Maybe!onErrorComplete() そもそも知らなかった…

io.reactivex.Single.create() でonSuccessしたあとにonErrorを実行するように書いたらどうなるのか

RxJava2の話です。
自分でSingle.create した時についうっかりonSuccessしたあとにonErrorを実行するようなコードを書いた場合どうなるのだろうか?と思ったのでやってみました。

Single<String> single = Single.create(emitter -> {
  emitter.onSuccess("test");
  emitter.onError(new RuntimeException("エラー"));
});

single.subscribe(string -> {
  Log.i("TEST", string);
}, throwable -> {
  Log.i("TEST", "onError");
});

こういうコードだとUndeliverableExceptionが出る。UndeliverableExceptionなので当然onErrorは実行されない。
では、onSuccessが2回実行されたらUndeliverableExceptionが出るのかと思ったけど、そうではないみたい。
以下のコードは 最初のonSuccessが1度実行されて終わる。

Single<String> single = Single.create(emitter -> {
  emitter.onSuccess("test");
  emitter.onSuccess("hoge");
});
single.subscribe(string -> {
  Log.i("TEST", string);
}, throwable -> {
  Log.i("TEST", "onError");
});

qiita.com

これを読んでると single()はonSuccessが2回実行されると死ぬでと書いてるのでRxJava2ではどうなのか確認してみたところ、RxJava2でも同様にエラーになったので同じく2つ目以降の値は流れてきてはいけないものに使うと認識した。

Observable.range(1, 5).single(100).subscribe(i -> {
  Log.i("TEST", String.valueOf(i));
}, Throwable::printStackTrace);

1個で良い場合は、first()を使うとよさそうだった。

Observable.range(1, 5).first(100).subscribe(i -> {
  Log.i("TEST", String.valueOf(i));
}, Throwable::printStackTrace);

追記:
紛らわしい感じだったので、タイトル直した。
複数回呼ばないほうが安心安全っぽいので基本はそうするのが良さそう

履歴から自分のツイートを検索してヒットしたツイートを消す

自分のツイートの全履歴は、twitterからダウンロード出来る。

 ag 黒歴史  | perl -p  -e 's/tweets.csv:\d+:\"(\d+)\",\".*/$1/g;' | xargs ruby delete.rb
require "twitter"

client = Twitter::REST::Client.new do |config|
  config.consumer_key        = "..."
  config.consumer_secret     = "..."
  config.access_token        = "..."
  config.access_token_secret = "..."
end


ARGV.each do |id|
  begin
    p id
    client.destroy_status(id)
  rescue Twitter::Error::NotFound => e
    p e
  end
end

古のツイートとか消せるので便利

RxLifecycleとPresenter

Presenterにどう渡すべきか?と思ってたけど、lifecycleオブジェクトを渡せばいいだけだった。

// Activity
new HogePresenter(this, lifecycle())

// Presenter
class HogePresenter {
  public HogePresenter(HogeViewCallback viewCallback, Observable<ActivityEvent> lifecycle) {
    this.viewCallback = viewCallback;
    this.lifecycle = lifecycle;
  }
  
  public void onCreate() {
    ...
    .compose(RxLifecycle.bindUntilEvent(lifecycle, ActivityEvent.DESTORY))
    ...
  }
}

RxLifeCycle試してる

Single.just(10)
    .doOnDispose(() -> Log.i("TEST", "dispose"))
    compose(bindUntilEvent(ActivityEvent.CREATE))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        i -> Log.i("TEST", String.valueOf(i)),
        throwable -> Log.i("TEST", throwable.toString()));

Single と Completable のときは、 onErrorにCancellationExceptionがくる。

Observable.interval(1, TimeUnit.SECONDS)
    .doOnDispose(() -> Log.i("TEST", "dispose"))
    .compose(bindUntilEvent(ActivityEvent.CREATE))
    .observeOn(AndroidSchedulers.mainThread())
    .doOnComplete(() -> Log.i("TEST", "onComplete"))
    .subscribe(
        i -> Log.i("TEST", String.valueOf(i)),
        throwable -> Log.i("TEST", throwable.toString()),
        () -> Log.i("TEST", "onComplete"));

ObservableとFlowableとMaybeは、onComplete()が実行されるとのこと。

Unsubscription RxLifecycle does not actually unsubscribe the sequence. Instead it terminates the sequence. The way in which it does so varies based on the type: Observable, Flowable and Maybe - emits onCompleted() Single and Completable - emits onError(CancellationException) If a sequence requires the Subscription.unsubscribe() behavior, then it is suggested that you manually handle the Subscription yourself and call unsubscribe() when appropriate.

RxLifeCycleのREADME

実際の処理はPresenterがやるので、bindUntilEvent(ActivityEvent.CREATE) をPresenterに渡す必要があるけど、どうPresenterに渡すのか?? あたりがこれだ!というのが自分の中にない。