将回调函数转为Flow

一、前言

在kotlin中,语言将程序进行了结构化处理,提高了可读性,对于旧的程序逻辑也提供了转换操作,这里记录下如何将回调转为Flow流,优化程序结构

二、代码示例

1、callbackFlow

这里演示callbackFlow的使用方式。callbackFlow属于多次回调可以重复触发,由于内容不是使用Channel进行通信,所以可以使用Channel的相关函数。

   interface Listener{
    
    
        fun listener()
        fun end()
    }
inner class TouchModel{
    
    
        private var listener: Listener ?= null
        fun registerListener(sourceListener: Listener){
    
    
            listener = sourceListener
        }
        fun unregisterListener(){
    
    
            listener = null
        }

        fun emit(){
    
    
            listener?.listener()
        }
        fun end(){
    
    
            listener?.end()
        }
    }
   @Test
    fun test(){
    
    
        val model = TouchModel()
        runBlocking {
    
    

            val flow = flowFrom(model)

            flow.onEach {
    
    
                println("YM--->流:$it")
            }.launchIn(this)
            delay(1000)
            model.emit()
            delay(1000)
            model.emit()
            delay(1000)
            model.emit()
            delay(1000)
            println("YM--->流即将结束")
            model.end()
            delay(1000)

        }
    }
    //callbackFlow属于多次回调可以重复触发,由于内容不是使用Channel进行通信,所以可以使用Channel的相关函数
    fun flowFrom(model: TouchModel): Flow<Int> = callbackFlow {
    
    
        var count = 0
        val callback = object : Listener{
    
    
            override fun listener() {
    
    
//  为了避免阻塞,channel可以配置缓冲通道,这个暂时不知道怎么处理
//                trySend(count)//这两种方式都行
                    trySendBlocking(count)
                        .onFailure {
    
     throwable ->
                            // Downstream has been cancelled or failed, can log here
                        }
                    count++
            }

            override fun end() {
    
    
                //当执行结束后可以使用以下方式关闭channel,或者抛出异常,该参数可选,
//                channel.close(IllegalStateException("这个状态不对"))
//                close(IllegalStateException("这个状态不对"))
//                channel.close() 等同于  close()
                println("YM--->Channel关闭")
                close()
            }
        }
        model.registerListener(callback)
        //因为是冷流,所以需要使用awaitClose进行挂起阻塞
        awaitClose {
    
    
            //关闭注册
            println("YM--->解除注册")
            model.unregisterListener()
        }
    }

2、suspendCancellableCoroutine

如果对于单次回调的的话。可以使用suspendCancellableCoroutine进行处理。示例代码如下:

      interface Listener{
    
    
        fun listener()
        fun end()
    }

    inner class TouchModel{
    
    
        private var listener: Listener ?= null
        fun registerListener(sourceListener: Listener){
    
    
            listener = sourceListener
        }
        fun unregisterListener(){
    
    
            listener = null
        }

        fun emit(){
    
    
            listener?.listener()
        }
        fun end(){
    
    
            listener?.end()
        }
    }
       @Test
    fun test(){
    
    
              val model = TouchModel()
        runBlocking {
    
    
//            val flow = flowFrom(model)
            val job = async {
    
    
                val flow = awaitCallback(model)
                println("YM--->流:$flow")
            }
//            delay(1000)
//            model.emit() //解开注释就可以看到流发射的情况
            delay(1000)
            println("YM--->流即将结束")
            model.end()
//            job.cancel()//该流是可以撤销的,假若里面任务还没有结束,这个任务可以直接撤销
            delay(1000)
        }
    }
  suspend fun awaitCallback(model: TouchModel): Int = suspendCancellableCoroutine {
    
     continuation ->
        val callback = object : Listener {
    
     // Implementation of some callback interface
            override fun listener() {
    
    
                continuation.resume(0){
    
    //协程恢复时候使用
                    continuation.resumeWithException(it)
                }
//                continuation.resumeWithException(cause)
       println("YM---->isActive:${
      
      continuation.isActive}--->isCancel:${
      
      continuation.isCancelled}")
            }

            override fun end() {
    
    
                continuation.cancel()
            }
        }
        // Register callback with an API
        model.registerListener(callback)
        // Remove callback on cancellation
        continuation.invokeOnCancellation {
    
    
            println("YM---->挂起关闭")
            model.unregisterListener()
        }
        // At this point the coroutine is suspended by suspendCancellableCoroutine until callback fires
    }

可以看到,执行一次就直接终止了,需要注意的是倘若任务没有执行完,直接进行continuation.cancel()。那么就会执行continuation.invokeOnCancellation函数。倘若,已经执行完再次进行continuation.cancel()。则不会执行continuation.invokeOnCancellation

3、CompletableDeferred

但是这个有个问题就是撤销的话不太好做关闭资源的操作,具体可以参考链接的第四条
这个也可以监听将回调函数进行转换,如下:

class CompletableDeferredTest {
    
    
    val response = CompletableDeferred<Int>()
    @Test
    fun test(){
    
    
        request(response)
        runBlocking {
    
    
            val result = response.await()
            println("YM---->结果:${
      
      result}")
//            response.cancel() //如果在结果返回前执行撤销,那么就会触发CompletableDeferred.invokeOnCompletion()函数
            delay(4000)
        }
    }

     fun request(rep: CompletableDeferred<Int>){
    
    

         Thread{
    
    //这里用线程而不用协程主要是想证明这个函数不需要协程环境就可以执行
             Thread.sleep(1000)//延迟两秒模拟请求
             rep.complete(2)
         }.start()
//         rep.completeExceptionally(IllegalStateException("非法状态异常"))//这个可以抛出异常
         rep.invokeOnCompletion {
    
    
             if (rep.isCancelled) {
    
    
                 println("Call cancelled")
             }
         }
    }

}

三、参考链接

  1. callbackFlow

  2. 使用更为安全的方式收集 Android UI 数据流

  3. Kotlin–suspendCancellableCoroutine和suspendCoroutine的区别及使用_Th.one的博客-CSDN博客

  4. CoroutineScope - CompletableDeferred cancellation

猜你喜欢

转载自blog.csdn.net/Mr_Tony/article/details/126119816