• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Scala Callback类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.kafka.clients.producer.Callback的典型用法代码示例。如果您正苦于以下问题:Scala Callback类的具体用法?Scala Callback怎么用?Scala Callback使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Callback类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: sendToKafkaWithNewProducer

//设置package包名称以及导入依赖的类
package pl.touk.nussknacker.engine.kafka

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

trait EspSimpleKafkaProducer {
  val kafkaConfig: KafkaConfig

  def sendToKafkaWithNewProducer(topic: String, key: Array[Byte], value: Array[Byte]): Future[RecordMetadata] = {
    var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
    try {
      producer = createProducer()
      sendToKafka(topic, key, value)(producer)
    } finally {
      if (producer != null) {
        producer.close()
      }
    }
  }

  //method with such signature already exists in "net.cakesolutions" %% "scala-kafka-client" % "0.9.0.0" but I struggled to add this dependency...
  def sendToKafka(topic: String, key: Array[Byte], value: Array[Byte])(producer: KafkaProducer[Array[Byte], Array[Byte]]): Future[RecordMetadata] = {
    val promise = Promise[RecordMetadata]()
    producer.send(new ProducerRecord(topic, key, value), producerCallback(promise))
    promise.future
  }

  def createProducer(): KafkaProducer[Array[Byte], Array[Byte]] = {
    new KafkaProducer[Array[Byte], Array[Byte]](KafkaEspUtils.toProducerProperties(kafkaConfig))
  }

  private def producerCallback(promise: Promise[RecordMetadata]): Callback =
    new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        val result = if (exception == null) Success(metadata) else Failure(exception)
        promise.complete(result)
      }
    }
} 
开发者ID:TouK,项目名称:nussknacker,代码行数:41,代码来源:EspSimpleKafkaProducer.scala


示例2: RecordCallback

//设置package包名称以及导入依赖的类
package articlestreamer.kafka

import org.apache.kafka.clients.producer.{Callback, RecordMetadata}

class RecordCallback extends Callback {

  override def onCompletion(metadata: RecordMetadata, ex: Exception) = {
    if (ex != null) {
      handleException(ex)
    } else {
      println(s"Successfully sent message : $metadata")
    }
  }
  
  def handleException(exception: Exception): Unit = {
    Console.err.println(s"Error while attempting to send message : $exception")
  }
} 
开发者ID:firens,项目名称:article-streamer-aggregator,代码行数:19,代码来源:RecordCallback.scala


示例3: SimpleProducer

//设置package包名称以及导入依赖的类
package de.choffmeister.microserviceutils.kafka

import akka.NotUsed
import akka.kafka.ProducerMessage.Result
import akka.kafka.{ProducerMessage, ProducerSettings}
import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata}

import scala.concurrent.{Future, Promise}

class SimpleProducer[K, V](producerSettings: ProducerSettings[K, V]) {
  private val producer = producerSettings.createKafkaProducer()

  def send(record: ProducerRecord[K, V]): Future[Result[K, V, NotUsed]] = {
    val promise = Promise[Result[K, V, NotUsed]]
    val callback = new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
        Option(exception) match {
          case Some(err) => promise.failure(err)
          case None => promise.success(Result(metadata, ProducerMessage.Message(record, NotUsed)))
        }
      }
    }

    producer.send(record, callback)
    promise.future
  }
} 
开发者ID:choffmeister,项目名称:microservice-utils,代码行数:28,代码来源:SimpleProducer.scala


示例4: Bidder

//设置package包名称以及导入依赖的类
package bidding.client.console

import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}

class Bidder(producer: KafkaProducer[String, String], itemId: String, startPrice: BigDecimal) {
  private var lastPrice: BigDecimal = startPrice

  private val callback = new Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
      println("published: " + metadata.toString)
    }
  }

  def bid(): Unit = {
    val producerRecord = new ProducerRecord("my-replicated-topic", itemId, lastPrice.toString())
    producer.send(producerRecord, callback)
    lastPrice += BigDecimal(scala.util.Random.nextDouble * 2)
  }
} 
开发者ID:oleksandr-iskhakov,项目名称:bidding-server,代码行数:20,代码来源:Bidder.scala


示例5: RecordCallback

//设置package包名称以及导入依赖的类
package articlestreamer.shared.kafka

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.producer.{Callback, RecordMetadata}

class RecordCallback extends Callback with LazyLogging {

  override def onCompletion(metadata: RecordMetadata, ex: Exception) = {
    if (ex != null) {
      handleException(ex)
    } else {
      logger.info(s"Successfully sent message : $metadata")
    }
  }
  
  private def handleException(exception: Exception): Unit = {
    logger.error("Error while attempting to send message", exception)
  }
} 
开发者ID:firens,项目名称:article-streamer,代码行数:20,代码来源:RecordCallback.scala



注:本文中的org.apache.kafka.clients.producer.Callback类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala LogManager类代码示例发布时间:2022-05-23
下一篇:
Scala VisibleForTesting类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap