-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Creating Observables
This section explains methods that create Observables.
-
just( )
— convert an object or several objects into an Observable that emits that object or those objects -
from( )
— convert an Iterable, a Future, or an Array into an Observable -
repeat( )
— create an Observable that emits a particular item or sequence of items repeatedly -
repeatWhen( )
— create an Observable that emits a particular item or sequence of items repeatedly, depending on the emissions of a second Observable -
create( )
— create an Observable from scratch by means of a function -
defer( )
— do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription -
range( )
— create an Observable that emits a range of sequential integers -
interval( )
— create an Observable that emits a sequence of integers spaced by a given time interval -
timer( )
— create an Observable that emits a single item after a given delay -
empty( )
— create an Observable that emits nothing and then completes -
error( )
— create an Observable that emits nothing and then signals an error -
never( )
— create an Observable that emits nothing at all
To convert any object (or objects) into an Observable that emits that object (or those objects) and then completes, pass that object (or those objects) into the just( )
method.
// Observable emits "some string" as a single item
def observableThatEmitsAString = Observable.just("some string");
// Observable emits the list [1, 2, 3, 4, 5] as a single item
def observableThatEmitsAList = Observable.just([1, 2, 3, 4, 5]);
// Observable emits 1, 2, 3, 4, and 5 as distinct items
def observableThatEmitsSeveralNumbers = Observable.just( 1, 2, 3, 4, 5 );
This has some similarities to the from( )
method, but note that if you pass an iterable to from( )
, it will convert the iterable into an Observable that emits each of the items in the iterable, one at a time, while the just( )
method would convert the iterable into an Observable that emits the entire iterable as a single item.
Note that if you pass null
to just( )
, the resulting Observable will not merely call onCompleted( )
without calling onNext( )
. It will instead call onNext( null )
before calling onCompleted( )
.
just( )
does not by default operate on any particular scheduler.
You can convert an object that supports Iterable
into an Observable that emits each iterable item in the object, or an object that supports Future
into an Observable that emits the result of the get
call, simply by passing the object into the from( )
methods, for example:
myObservable = Observable.from(myIterable);
You can also do this with arrays, for example:
myArray = [1, 2, 3, 4, 5];
myArrayObservable = Observable.from(myArray);
This converts the sequence of values in the iterable object or array into a sequence of items emitted, one at a time, by an Observable.
You can convert an empty iterable (or array) into an Observable in this way. The resulting Observable will invoke onCompleted()
without first invoking onNext()
.
Note that when the from( )
method transforms a Future
into an Observable, such an Observable will be effectively blocking, as its underlying Future
blocks.
Note: in the scala language adaptor for RxJava, the version of this method that works with sequences (arrays) is called
items( )
.
from( )
does not by default operate on any particular scheduler. The variant that converts a Future
into an Observable has a version that accepts a scheduler as a parameter, which will be the scheduler that governs that Future
.
- javadoc:
from(future)
,from(future, timeout, unit)
, andfrom(future, scheduler)
- javadoc:
from(iterable)
, andfrom(iterable, scheduler)
- javadoc:
from(array)
- RxJS:
fromArray
- RxJS:
fromPromise
- Linq:
ToObservable
There are also versions of repeat( )
that repeat only a certain number of times before terminating.
repeat( )
operates by default on the trampoline
scheduler, and has variants that allow you to specify which scheduler to operate on by passing it in as a parameter.
- javadoc:
repeat( )
andrepeat(scheduler)
- javadoc:
repeat(count)
andrepeat(count, scheduler)
- Linq:
Repeat
- RxJS:
repeat
create an Observable that emits a particular item or sequence of items repeatedly, depending on the emissions of a second Observable
The repeatWhen( )
operator is similar to repeat( )
but decides whether or not to resubscribe to the source Observable and remirror its emissions by emitting a void item to a second Observable on completion of the source Observable, and observing the result. If that result is an emitted item, repeatWhen( )
resubscribes to the source and the process repeats; if that result is an onCompleted
notification, repeatWhen( )
also completes.
repeatWhen( )
operates by default on the trampoline
scheduler, and has a variant that allows you to specify which scheduler to operate on by passing it in as a parameter.
You can create an Observable from scratch by using the create( )
method. You pass this method a function that accepts as its parameter the Subscriber that is passed to an Observable’s subscribe( )
method (or that is derived from the Observerer
that is passed to that method). Write the function you pass to create( )
so that it behaves as an Observable — calling the passed-in Subscriber’s onNext( )
, onError( )
, and onCompleted( )
methods appropriately. For example:
def myObservable = Observable.create({ aSubscriber ->
try {
for (int i = 1; i < 1000000; i++) {
if (aSubscriber.isUnsubscribed()) {
return;
}
aSubscriber.onNext(i);
}
if (!aSubscriber.isUnsubscribed()) {
aSubscriber.onCompleted();
}
} catch(Throwable t) {
if (!aSubscriber.isUnsubscribed()) {
aSubscriber.onError(t);
}
}
})
NOTE: A well-formed finite Observable must attempt to call either the Subscriber’s onCompleted( )
method exactly once or its onError( )
method exactly once, and must not thereafter attempt to call any of the Subscriber’s other methods. It is good practice to check the Subscriber’s isUnsubscribed( )
state so that your Observable can stop emitting items or doing expensive calculations when there is no longer an interested Subscriber.
Note: in the scala language adaptor for RxJava, this method is called
apply( )
.
create( )
does not by default operate on any particular scheduler.
- javadoc:
create(OnSubscribe)
- RxJS:
create
- Linq:
Create
do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription
Pass defer( )
an Observable factory function (a function that generates Observables), and defer( )
will return an Observable that will call this function to generate its Observable sequence afresh each time a new Subscriber subscribes.
defer( )
does not by default operate on any particular scheduler.
- javadoc:
defer(observableFactory)
- RxJS:
defer
- Linq:
Defer
To create an Observable that emits a range of sequential integers, pass the starting integer and the number of integers to emit to the range( )
method.
// myObservable emits the integers 5, 6, and 7 before completing:
def myObservable = Observable.range(5, 3);
In calls to range(n,m)
, a value of zero for m will result in no numbers being emitted (values less than zero will cause an exception). n may be any integer that can be represented as a BigDecimal
— posititve, negative, or zero.
range( )
does not by default operate on any particular scheduler but has a variant with which you can instruct it to operate on a particular scheduler by passing one in as a parameter.
- javadoc:
range(start, count)
andrange(start, count, scheduler)
- RxJS:
range
- Linq:
Range
- Introduction to Rx: Range
To create an Observable that emits items spaced by a particular interval of time, pass the time interval and the units of time that interval is measured in (and, optionally, a scheduler) to the interval( )
method.
interval( )
operates by default on the computation
scheduler and also has a variant with which you can choose which scheduler it operates on by passing it in as a parameter.
- javadoc:
interval(interval, unit)
andinterval(interval, unit, scheduler)
- RxJS:
interval
- Linq:
Interval
- Introduction to Rx: Interval
The timer( )
method returns an Observable that, when subscribed to, waits for a span of time that you have defined, then emits a single zero and completes.
There is also a version of timer( )
that emits a single zero after a specified delay, and then emits incrementally increasing numbers periodically thereafter on a specified periodicity:
timer( )
operates by default on the computation
scheduler and also has variants with which you can choose which scheduler it operates on by passing it in as a parameter.
- javadoc:
timer(delay, unit)
andtimer(delay, unit, scheduler)
- javadoc:
timer(delay, period, unit)
andtimer(delay, period, unit, scheduler)
- RxJS:
timer
- Linq:
Timer
- Introduction to Rx: Timer
-
empty( )
creates an Observable that does not emit any items but instead immediately calls the Subscriber’sonCompleted( )
method.
-
error( )
creates an Observable that does not emit any items but instead immediately calls the Subscriber’sonError( )
method.
-
never( )
creates an Observable that does not emit any items, nor does it call either the Subscriber’sonCompleted( )
oronError( )
methods.
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
println("*** empty() ***");
Observable.empty().subscribe(
{ println("empty: " + it); }, // onNext
{ println("empty: error - " + it.getMessage()); }, // onError
{ println("empty: Sequence complete"); } // onCompleted
);
println("*** error() ***");
Observable.error(new Throwable("badness")).subscribe(
{ println("error: " + it); }, // onNext
{ println("error: error - " + it.getMessage()); }, // onError
{ println("error: Sequence complete"); } // onCompleted
);
println("*** never() ***");
Observable.never().subscribe(
{ println("never: " + it); }, // onNext
{ println("never: error - " + it.getMessage()); }, // onError
{ println("never: Sequence complete"); } // onCompleted
);
println("*** END ***");
*** empty() ***
empty: Sequence complete
*** error() ***
error: error - badness
*** never() ***
*** END ***
empty( )
, error( )
, and never( )
do not by default operate on any particular scheduler.
- javadoc:
empty()
- javadoc:
error(throwable)
- javadoc:
never()
- RxJS:
empty
andnever
- Linq:
Empty
andNever
- Introduction to Rx: Simple factory methods
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava