Prasham Trivedi

Using Coroutines

November 28, 2018

13 minutes to read.

Talk

Introduction

Kotlin coroutine is most powerful and natural ways to create and express multithreading code in our apps. Let’s take a look at why we need multithreading code and why coroutines are best choice for them.

Generally there are atleast two threads available for Android Developers. One is Main Thread where we do all our UI related work, which include layout, measure, animation and all other stuffs that user can see and interact with. And there are other tasks which provide user a meaningful data, like calling remote or local database, reading a file etc. should be done in a separate thread, which is often called Worker Thread. And we should offload data fetching or any other non-ui updated task in Worker Thread to make Main Thread as smooth as possible.

We should also try to make requests to worker thread as parallely as possible. For an example, Getting user information and Getting Feed are not dependent on each other and can be run parallely so that both the result can load as fast as possible.

That leads us to a use-case, a flow which has both sequential calls as well as parallel calls to our webservices.

  1. I have a token, I am sending it to server to verify it’s validiy.
  2. Server gives me a user Id. I have to fetch User data using that.
  3. That user data has an image url, I have to load that Image in toolbar
  4. I have to load feeds in homescreen.
  5. All feeds have an image.

Here 3 is dependent on 2 which is dependent on 1. So this call sequence should be 3->2->1. While request 3 & request 4 should run parallely. 5 is bunch of requests that dependend on 4 but all of the requests in 5 should run parallely to each other.

From it’s inception in 2007 to today, Android and it’s threading has got a quite interesting history, let’s see how (or not) we were able to solve above problem in different points of history.

Threading: Brief History of AndroidKind….

Note: This is quite a pun and homage to one of my favorite books: Sapiens the brief history of humankind, also linked above

In prehistoric times we use simple Java Threads, and using services and handlers to pass data and messages between two threads or Activities and Service(which we used to believe runs in Worker Thread). But doing this was literally a headache.

In API 3 we were given AsyncTasks, which was recommended solution for accessing other threads and doing Multithreaded works. It was a very robust API, requires three types. First one for Input, second one for Progress and third one for Output. Using four methods AsyncTasks gave us places to do something before the work(onPreExecute), do the work itself(doInBackGround), publish progress(onProgressUpdate) and process our output(onPostExecute). All we had to do define right types and call right methods and everything should fall in it’s place. For that, AsyncTask, an API introduced in 2009 allowed us to do the work which is quite useful right now. This API was way ahead of it’s time.

But all was not well for AsyncTasks. The API had it’s own set of problem. The one being maintainability, it’s easy to get things wrong when sequential calls and parallel calls are concerned. It’s easy to have a lot of AsyncTasks and get lost to do the work. Also the code produced a lot of leaked context and was a reason behind many NullPointerExceptions, IllegalArgumentExceptions back in the day. And because of that, one of the most powerful APIs of that time also became one of the most misunderstood and misused APIs. Many people didn’t knew about parameters or not aware about progress updates, and quick slapping <String,Void,MyJson> AsyncTasks also gave us uncontrolled & unwelcomed ProgressDialogs ruling whole screens.

And that jumped community to look for alternatives. I myself have used one such alternative, IntentService with BroadcastReceiver. Then came 2015…

RxJava- The Million Dollar Baby

Whether you’re a Netflix subscriber or not, you’ll love them for porting Microsoft’s Reactive Extensions to Java and giving us RxJava back in 2013. And by 2015 it received a grand welcome from Android Community. They provided good apis compared to Services and AsyncTasks to offload works to other threads.Everything was one peaceful stream. You call subscribeOn and pass a Scheduler to determine in which thread you want to process or create your background stream, and you call observeOn and pass another Scheduler to determine in which thread you want your output.

And between that, there’s whole world for you to do, you can map, group, reduce, sum the data the way you want, you can create one stream from other and join two stream to process. There are a whole lot of operators to do anything you want. Just like Apple used the slogan There’s an app for that for their app store, community of Rx use to say There’s an operator for that. Because they have almost any operator to achieve anything you want.

Parallelism and Sequential calls. That was also made easy, if you wanted sequential calls, use operator that can create another observer from your input and that became your sequential call. And parallel calls, make sure you don’t use Scheduler.newThread() and observe two separate observers, which can run parallely and you’re good to go. And at that time many Java8 developers and Javascript developers were feeling familiar, as they have used Future and Promise which had almost similar API surface with RxJava.

And on top of everything, RxJava has almost solved leaking problem by providing easy to cancel APIs and that reduced lot of NullPointerExceptions and other kind of exceptions which happened because of Context leaks.

Callback hell- The confused programmer’s inception

But RxJava wasn’t free from problems. And one of the biggest problem was callback hell. RxJava’s main strength, the callback became it’s biggest point of inviting confusion, called callback hell. Callback hell is a situation where your callback has or managed by one more callback. RxJava, Future and Promise APIs all suffer from same problem. And in Callback hell, handling and propogating Exceptions are a nightmare. Also RxJava has a tough learning curve, second to Dagger and DI.

While we are talking avoiding nulls and NPEs, one language being developed and explored around same time and became popular in 2016, Kotlin. Later in 2017 the language got a feature called coroutines. A Feature we are talking about.

Enter Coroutines.

Kotlin Coroutines was introduced as an Experimental feature in Kotlin 1.1, and since 1.3 basic coroutines are stable. Like AsyncTasks, IntentServices, RxJava or Task apis, Coroutines are the way to offload the work in other thread and jump between two threads.

And it achieves it by avoiding use of Callbacks. When you’re using kotlin coroutines, barring some minor details your multi-threaded code looks exactly like your normal method calls. Using suspend, async, await and other builders coroutines offer smarter and cleaner way to managing multithreaded calls. And by the way compared to other mechanism, coroutines are as independent as threads, but on the other way, a single coroutine is lot lighter than a single therad which does same work as coroutines.

Here is one example of coroutine code.

//2 GlobalScope.launch { val userData = getUserDataFromWebservice().await() //4 showUserData(userData) } //1 suspend fun getUserDataFromWebservice(){ //3 return async{ // Do your retrofit work here return@async wsResult }

Above code illustrates normal use of Coroutine code, where we have defined a method to get UserData from webservice. And this method is marked with suspend, in launch block, the code gets user data just like normal function and calls showUserData which goes to UI thread. Here the code almost looks like a regular code, except there is thread hopping, calling webservice and waiting for the reply.And we have used regular coroutine constructs like launch, async, await and suspend. They’re called building blocks of Kotlin, let’s understand those in a bit.

Building Blocks of Coroutines.

1- Suspended Function: A suspended function is marked with suspend keyword. A long running task should run in a Suspended function. A suspended function (or a suspended code) waits for it’s work to complete without blocking Thread or consuming CPU resources. Many I/O bound tasks, such as calling webservice, querying database & reading from files fits as suspending functions. While tasks requiring computations, like searching large array or processing a bitmap or using ML model can be thread blocking and consuming lot of CPU resources. Roman Elizarov, team lead on coroutines team at Jetbrains wrote a nice article, on blocking threads and suspending coroutines.

2- Launch- A suspended function can only be called from suspended function. And the lambda we pass to launch is a suspended function. Actually, launch is one of the easily available entry points for coroutines. It defines a fire and forget approach to coroutines. The method launch, launches a coroutines and sequentialy completes it. The output of launch is a Job object, which is useful to join, cancel and complete executions.

3- Async- Async is one of the blocks used in all async-await based thread calling mechanisms. In Javascript(ES 16) async and await are keywords while in Coroutine they’re methods. Async tries to suspend current call and wait for the lambda passed to it to complete it’s execution. It returns Deferred<T> object. Where T is actual data and Defered is a Future or promise type class.

4- Await- Deferred<T> exposes a method called await which returns T. The reason output of async is called Deferred because, it’s execution is deferred until you call await. As soon as you call await, lambda passed to async executes and the thread is suspended until result is available.

These are basic blocks in Coroutines, no matter how far and how advanced you go, you will encounter these keywords and methods.

Using these blocks, the solution to our problem defined earlier can be expressed using coroutines is as follows.

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    //sampleStart
    launch{
        val userId = getUserIdFromToken().await() //1
        val userData = getUserData(userId) //2
        val feeds = getFeeds(userId)   //3
        val user = userData.await()   //4
        val feedData = feeds.await()//4
        println(userId)
        println(user)             //5
        println("Image")
        println(feedData.joinToString())
        for(feed in feedData){            //7
            showFeedImage(getFeedImages(feed))
            //8
        }
    }
    //sampleEnd
}



fun showFeedImage(image:Deferred<String>){
    GlobalScope.launch{
        println(image.await())    
    }
    
}

suspend fun CoroutineScope.getFeedImages(imageUrl:String)=async{
    println("ImageEnter $imageUrl")
    delay(30L)
    println("ImageExit $imageUrl")
    return@async imageUrl
}

suspend fun CoroutineScope.getUserIdFromToken()=async{
    println("Id Enter")
    delay(100L)
    println("Id Exit")
    return@async "13"
}

suspend fun CoroutineScope.getUserData(userId:String)=async{
    println("Data Enter $userId")
    delay(100L)
    println("Data Exit")
    return@async "UserData Json"
}

suspend fun CoroutineScope.getFeeds(userId:String)=async{
    println("Feed Enter $userId")
    delay(100L)
    println("Feed Exit")
    return@async arrayOf("Feeds1","Feeds2","Feeds3")
}

In 1, we ask our server to validate token and give us the userId, and we wait till userId comes. Once userId comes, we create two new async methods to get User data and feeds(2 & 3). Both of these calls will start parallely, in 4 we wait for user and feedData to return. Once user data is available, we will show user data(5) and download and show image(6) both of these are dependent on getting user data. Similarly once feedData comes in, you can show feeds(6). And for all feeds (7) you can launch parallel calls to download feed images and wait for each one to respond(8).

Here we have launched one parent coroutine and solved our problems by launching multiple coroutines into that parent coroutine. And thus coroutines always support parent child relationship between coroutines. And that lead to a significant change in coroutines going stable.

Structured Concurrency

Parent child relationships in Coroutines existed from beginning of coroutines. But they carry one problem, cancelling a parent doesn’t always guarantee cancelling a child, thus there was chance that Child coroutine(s) can leak. And that was a problem till coroutines v0.26. Since Coroutines v0.26 which was essentially RC version, they solved this problem, by solving this, they introduced one major change, called Structured Concurrency, which introduced big changes on how coroutines work, and mostly how the APIs are called.

Before v0.26, we can call launch or async builders directly. Since 0.26, almost all coroutine builders are scoped to a class called CoroutineContext, which helps cancelling child coroutines as soon as parent coroutine is got cancelled. Our direct calls to launch or async becomes CoroutineScope.launch or CoroutineScope.async .One easily available CoroutineContext is GlobalScope, which as name suggests scoped with entire application. And cancelled when App stops. To take more control on our lifecycle, we can make our own scope. Let’s look at an example.

class MainActivity : AppCompatActivity(), /**1**/ CoroutineScope { //3 private lateinit var job: Job //2 override val coroutineContext: CoroutineContext get() = Dispatchers.Main + job override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) ... //4 job = Job() ... } override fun onDestroy() { //5 job.cancel() super.onDestroy() }

Here, we want to control coroutine scope to the lifecycle of our activity, actually we can control scope to any class where we know/control the lifecycle. To do that, we need to extend our class with CoroutineScope interface (1). Which forces us to override coroutineContext (2). We should pass a context to it. Coroutine library provides default Dispatchers(who are child classes of CoroutineContext),we will focus on that bit later, here we’re using main dispatcher. After defining a Dispatcher, we join a Job Instance we’ve created(3) to that dispatcher. We create a new job in earliest point of lifecycle fits our workflow, here we used onCreate(4) for that. That Job becomes parent job for every coroutines created during the lifecycle. When we no longer wait for these coroutines, we call job.cancel() in appropriate method(5), and because of structured concurrency we are made sure all child coroutines are cancelled.

Talking about dispatchers, Dispatcher.Default is default dispatcher, it provides atleast two threads upto max number of CPU cores.There are other dispatchers like Main(Called in main thread), IO(Called in worker thread with some thread pools) and Unconfiled(Called immidiately on whichever thread it’s being called). And these dispatchers along with Scope define in which thread which coroutine block runs.

For example all GlobalScope.launch run in Default dispatcher. In case of Custom Scope, we can define any dispatcher in Coroutine Context, and all coroutines will launch in thread handled by that dispatcher. However, it’s easy to override the dispatcher of Coroutine. Consider below example.

//1 class Scope:CoroutineScope{ ... launch(context = Dispatchers.IO){ //2 async(context = Dispatchers.Main){ //Dangerous //3 } } ... }

More practicle example is found here

Here, ignoring GlobalScope or any CoroutineScope(1), we have passed Dispatchers.IO in context argument of launch(2), indicating it will run in IO thread. Here async is child of launch, which should run in IO thread by default but we have overriden that by passing Main dispatcher as it’s context(3).

This is very basic and important information regarding Kotlin coroutines. After that we will move a bit towards some advanced topics and some problems coroutine team wants to solve. And Shared Mutable state is one of them.

Shared Mutable State: The Problem.

Mutable is the data you can change, for example if you can mute your speaker, that audio stream is mutable and can be changed to 0. Mutable data while being popular and widely used, has a source of many bugs in multi threaded programming. A data defined midway by other thread during execution can create lot of confused screens, and traditionaly using syncronized blocks java has been fighting with this problem. Kotlin uses a slight difference approach to solve this problem. And basic block to do that is Channels.

Channels

Channels are special coroutine constructs that makes easy to transfer a stream of values between coroutines, without making them mutable. Just like Java’s BlockingQueue which is thread safe way to pass message between thread in Queue, with a minor difference that operation in Channels are suspended while in BlockQueue has blocking operation.

Channels are of two types, SendChannel which used only to send data to other coroutines, and ReceiveChannel which is used at receiving end. Channel object extends Both SendChannel and ReceiveChannel, thus can be used to send and receive. By default both of these operations are suspending, send is suspended till there is a receiver available, and receive is suspended till channel is sending.

Let’s see an example of how channels can be used.

//1 @ExperimentalCoroutinesApi suspend fun CoroutineScope.openChannel( interval: Long=30 * DateUtils.SECOND_IN_MILLIS) : ReceiveChannel = /**2**/ produce { while (true) { //3 send(Unit) //4 delay(interval) } } //5 fun doSomething(interval:Long=600L,duration:Long=6000L){ val openChannel = openChannel(interval = interval) try { withTimeout(timeMillis = duration) { //6 openChannel.consumeEach { doOnTick() } } } catch (e: TimeoutCancellationException) { //7 openChannel.cancel() stopService() } }

Here, we call openChannel method which returns a ReceiveChannel(1). We can create it using a produce method(2), which passes SendChannel to it’s lambda using which we can send our data, in our case nothing to receiving end(3). And we will sleep for a while using delay(4). We can keep a reference to channel(5). Note that send call above is suspended till this channel is receiving using consumeEach(6). Apart from this method, channel’s stream will be an Iterator, which will iterate for every send, and will end when we terminate our channel(7) when our work is done.

Also, note that we have used withTimeOut which is coroutine’s way of defining and handling timeouts.

Actors

One more advanced concept in Coroutines which uses Channels in it’s core is Actors. At this point I am still getting my hands wet on Actors, but based on what I know, Actors are multithreaded state machine. Actors have State and Channel, we can send our messages using channels. Those messages can be used to change or query the state, and without making it mutable, actors can operate on the State. By default Actors process one message at a time, and it operates at Mailbox processing, processing message came first, and suspends processing next message till current one is finished.

Close things to actors in Java is Threadpool, which actors manage under the hood.

Before we end, there are some points regarding coroutines which I like to mention.

Misc

  • Coroutines are designed to be sequential, in launch block every statement is sequential execution.
    • Only deferred execution, using await are not sequential, they can run parallely unless dependent on each other.
  • A Job can not be recreated, once it’s got cancelled, it’s gone forever. However you can create new job with same parameters to refer elsewhere.
  • For channels, only one coroutine can await send or receive operations per channel.
    • For multiple awaits, use select clause and onReceive or onSend methods on each channels.

Error Handling

  • There are no other callbacks in Coroutines for error handling. It can be done using normal try/catch/finally blocks.
  • If an exception occurs in launch it’s thrown instantly.
  • Same way, exception occurring in async is swallowed, and thrown where await is called.
  • For a parent-child coroutine, all children can throw exceptions, but only first thrown exception is reported in catch block.
    • Exception from other children can be found in that thrown exception’s suppressed exception property.
  • CancellationException and all their children are special cased exceptions, they are thrown when Jobs are cancelled and thus are just messages indicating finished jobs. Thus they’re not re-thrown and swallowed. However like in above example, we can catch these exception to react on finished or cancelled jobs.

Comparision with RxJava.

  • RxJava v2 introduced backpressure. Backpressure occurs when sender can omit values faster then receiver can omit. However as coroutines suspend for receiving and sending, backpressure is almost hard to reproduce in RxJava.
  • If you want to find a familiar concept. Observables are almost synonymous to async where both does background works, while it’s emitting is synonymous to channels.
  • While Observers are synonymous to launch where we’re supposed to receive outputs.
  • Talking about operators, many commonly using operators are already available in Kotlin Collectionsm with or without Coroutines, but some operators like debounce are not in Coroutines.

That’s it for Coroutines. I have learned it all by reading posts mentioned in this articles, and watching some videos. You can found them following below links.

Study Material

THE END.

comments powered by Disqus