- Published on
Android Thread: Understanding RxJava Backpressure
- Authors
- Name
- Dan Tech
- @dan_0xff
In the previous article, we understood RxJava and some of its support for Java / Kotlin software development. Continuing in this article, we will talk about an interesting concept in programming: Backpressure.
What is Backpressure?
Backpressure is when Supply exceeds Demand.
Some Real-World Examples
Example 1: At the ABC cinema, there is only one ticket booth. Today, a very hot new movie is released, causing hundreds of people to line up to buy tickets. The ticket booth can process a customer's request in 3 minutes. Currently, 200 people are waiting to buy tickets, and there are only 45 minutes until the movie starts.
At this point, even if the ticket booth works at full capacity, it can only process about 15 more customers by the time the movie starts. The rest will not be able to enter the theater in time to watch the movie. -> Backpressure on the manual ticket sales system of the cinema.
Example 2: In 2069, the rainfall in the upper reaches was too great, causing the water flow to the hydroelectric dams to be rapid and unexpected. The dam's water control valves, although operating at full capacity, still cannot effectively regulate the water flow. The water pressure on the dam system is increasing day by day. The directors and leaders of the hydroelectric dam unit are forced to make a decision to open the dam to discharge floodwaters within the next 12 hours to reduce pressure on the water regulation system. -> Backpressure on the water regulation system of the hydroelectric dam, requiring a temporary solution to protect the dam's load-bearing capacity.
Some Programming Examples
Example 1: On Black Friday, the number of customers participating in shopping on e-commerce platforms increases dramatically during the hours of 11 AM and 11 PM. The current system cannot smoothly handle customer transactions. -> Backpressure on the login and payment systems of the shopping application.
Example 2: You are a Mobile Engineer, implementing a Socket Connection to receive Real-time Crypto Market data. The Socket will return data to you approximately every 3-5 seconds. During this time, the Mobile device is capable of receiving, processing, and rendering on the application. However, one day Elon Musk posted a status on X.com to shill Doge coin, and then everything went crazy as traders entered and exited orders continuously, causing the Crypto Market system to respond continuously and return Market data to you at a frequency of 500ms each time. At this point, your mobile device is frozen due to too many responses from Socket -> UI Render -> uncomfortable experience for customers -> Backpressure on the processing & rendering system of the Mobile application.
Backpressure Concept in Programming
Backpressure is a phenomenon that occurs when there is a difference between the processing speed of the Consumer and the Producer, in the direction that the Producer emits an item faster than the Consumer can process it.
Backpressure is a situation where there is an imbalance between the consumer and the producer. In this case, the producer generates items faster than the consumer can process them.
Learn this, win the Interview in 30s
When Does Backpressure Occur in RxJava?
In RxJava, we have Observable Objects that can Emit items, and Subscribers will receive these Items and process them according to their individual needs.
Backpressure occurs when the rate of calling onNext of the emitter is faster than the processing rate at the subscriber end.
Backpressure in Observable
Observable.create {
var i = 0L
// dummy call to emit items
while (true) {
i += 1
it.onNext(LongArray(DummyDataSize) { i }) // simulate large data
Thread.sleep(10) // emit every 10 ms
}
} .subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe({
Thread.sleep(100) // simulate computation
Log.d("BackpressureViewModel", "onRxJavaObservableBackpressure: ${it[0]}")
}, {
Log.d("BackpressureViewModel", "onRxJavaObservableBackpressure: Error: $it")
})
Looking at the example above, you can see that, within the scope of the create function, we emit items at a rate of 10ms each time. But at the subscribe end, we process each item taking up to 100ms. Thus, after processing 1 item, there are already 10 items waiting to be processed next; after processing 10 items, there are already 90 more items waiting, and the number will keep increasing until the device's resources cannot meet the demand, and a Crash will occur.
The crashes in this case are usually OutOfMemory Exceptions. Here, the system does not recognize that you have coded incorrectly, because the code is not actually wrong. It is just that the Memory is not being managed well, so this situation occurs.
Backpressure in Subject
PublishSubject.create()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe({
Thread.sleep(100) // simulate computation
Log.d("BackpressureViewModel", "onRxJavaSubjectBackpressure: ${it[0]}")
}, {
Log.d("BackpressureViewModel", "onRxJavaSubjectBackpressure: Error: $it")
})
// dummy call to emit items
var i = 0L
while (true) {
i += 1
subject.onNext(LongArray(DummyDataSize) { i }) // simulate large data
Thread.sleep(10) // emit every 1 ms
}
Subject has a slightly different way of working compared to Observable, so you can see that we are allowed to actively emit items to the Subject in any place, without having to be in the scope of create like Observable.
In the 2 examples above, when you set a larger value for DummyDataSize, Backpressure occurs faster, and OutOfMemory occurs faster.
Check out my source code on GitHub to learn more: https://github.com/dantech0xff/AndroidConcurrencyExamples
Approaches to Backpressure in RxJava
99% of Backpressure will cause OutOfMemory. In addition to OutOfMemory Exception, before it occurs, users have to experience some bad experiences on the software: freezing, lagging, slow, or overheating.
To handle Backpressure, there is only one way: don't let it happen. To do that, you can refer to the 3 directions below.
Throttle
Throttle - literally translated as strangling.
Throttle in RxJava is used to distribute the output of the data stream so that they are evenly spaced at least a certain amount of time (t). While distributing, we may remove some redundant items. There are 3 types of Throttle you can learn about.
Throttle First
observable.throttleFirst(1000, TimeUnit.MILLISECONDS)
Throttle First is a solution that removes redundant emitted items if they are emitted within a certain amount of time.
Suppose the first item of throttleFirst(1000, TimeUnit.MILLISECONDS) is received at timestampMillis = t, then for the period t + 1000, no items will be processed by the subscriber. The first item emitted from the time t + 1000 onwards will be processed by the subscriber, and the t mark will be reset to new.
Interview question: Does Throttle First receive all of the first and last items of the Observable?
Answer: Throttle First receives the first Item from the Observable. However, if the Last item is emitted during the throttled time (dropped), the subscriber will not be able to receive it, resulting in no last item.
Throttle Last
throttleLast(1000, TimeUnit.MILLISECONDS)
Similar to the operation of Throttle First, but in this processing method, we will only receive the Item emitted last in the time t + 1000, where t is the time of subscribe, 1000 is the deltatime of throttleLast.
Interview question: Does Throttle Last receive all the first and last Items from the Observable?
Answer: Throttle Last may not receive the first Item from the Observable, if an item is emitted immediately after it, and is still within the Throttle time. Throttle Last may also not receive the last Item from the Observable, if it is emitted within a Throttle time frame, and is immediately followed by an onComplete.
Throttle Latest
Throttle Latest is an approach that helps the Subscriber receive the first Item from the Observable like Throttle First. From the 2nd Item onwards, it will be processed like Throttle Last.
Throttle Application
Throttle can be used by the Consumer to distribute the frequency of receiving data evenly. Examples include: Game Event, Video Streaming, Web Socket Event, ...
Debounce
Debounce is a completely different approach from Throttle. While Throttle blocks the emission of Items according to a time frame, Debounce flexibly stretches that time frame out (possibly to infinity if the time between 2 emits < debounce time)
Debounce can be used in processes where we need to limit spam events. Examples include: Action Click Event, Search Text, ...
Flowable
Flowable is not an operator in RxJava. Flowable is an Alternative to Observable with more powerful and convenient Backpressure handling support.
Flowable supports 5 Backpressure handling strategies.
Although Flowable has a solution to handle Backpressure, we can still encounter OOM Crash if the Emit Item has too large a capacity. At this time, although the Buffer is not yet full, the machine's memory has been almost completely occupied, leading to OutOfMemory Exception. When encountering cases like this, we are forced to go back to find out where the Memory issue is occurring, and find ways to optimize it.
Happy Coding!