永州网,内容丰富有趣,生活中的好帮手!
永州网 > 百科 > 正文

Java中如何使用Avro值将两个Kafka流结合并生成结果

时间:2024-03-02

本文将介绍如何使用Kafka Streams库来实现这一目标

友情提示:本文共有 3627 个字,阅读大概需要 8 分钟。

在Java中,结合两个Kafka流并将它们与Avro值结合在一起是一个常见的任务。本文将介绍如何使用Kafka Streams库来实现这一目标。我们将讨论如何将两个流合并为一个新的流,并将结果发布到具有Avro值的主题中。我们还将探讨如何在Java中使用Avro库来处理流的序列化和反序列化。通过本文的指导,读者将了解如何利用Java和Kafka Streams来处理流数据,并将其结合在一起并产生具有Avro值的结果。

我有两个Kafka Streams,它们具有String键和我使用KSQL创建的Avro格式的值.

这是第一个:

DESCRIBE EXTENDED STREAM_1; Type : STREAMKey field : IDUSERTimestamp field : Not set - using <ROWTIME>Key format : STRINGValue format : AVROKafka output topic : STREAM_1 (partitions: 4, replication: 1) Field | Type-------------------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) FIRSTNAME | VARCHAR(STRING) LASTNAME | VARCHAR(STRING) IDUSER | VARCHAR(STRING)

第二个:

DESCRIBE EXTENDED STREAM_2;Type : STREAMKey field : IDUSERTimestamp field : Not set - using <ROWTIME>Key format : STRINGValue format : AVROKafka output topic : STREAM_2 (partitions: 4, replication: 1) Field | Type-------------------------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) USERNAME | VARCHAR(STRING) IDUSER | VARCHAR(STRING) DEVICE | VARCHAR(STRING)

所需的输出应包括IDUSER,LASTNAME,DEVICE和USERNAME.

我想使用Streams API离开(在IDUSER上)加入这些流,并将输出写入kafka主题.

为此,我尝试了以下方法:

public static void main(String[] args) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-strteams"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final Serde<String> stringSerde = Serdes.String(); final Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde(); boolean isKeySerde = false; genericAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), isKeySerde); KStreamBuilder builder = new KStreamBuilder(); KStream<String, GenericRecord> left = builder.stream("STREAM_1"); KStream<String, GenericRecord> right = builder.stram("STREAM_2"); // Java 8 example, using lambda expressions KStream<String, GenericRecord> joined = left.leftJoin(right, (leftValue, rightValue) -> "left=" leftValue ", right=" rightValue, /* ValueJoiner */ JoinWindows.of(TimeUnit.MINUTES.toMillis(5)), Joined.with( stringSerde, /* key */ genericAvroSerde, /* left value */ genericAvroSerde) /* right value */ ); joined.to(stringSerde, genericAvroSerde, "streams-output-testing"); KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.cleanUp(); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}

然而,

KStream<String, GenericRecord> joined = ...

在我的IDE上抛出错误:

incompatible types: inference variable VR has incompatible bounds

当我尝试将String Serde用作键和值时,它可以工作,但数据不能从kafka-console-consumer读取.我要做的是以AVRO格式生成数据,以便能够使用kafka-avro-console-consumer读取它们.

解决方法:

我的第一个猜测是您要从join操作返回一个String,而您的代码期望将GenericRecord作为结果:

KStream<String, GenericRecord> joined = left.leftJoin(right, (leftValue, rightValue) -> "left=" leftValue ", right=" rightValue, ...)

请注意,联接的类型为KStream< String,GenericRecord&gt ;,即值的类型为GenericRecord,但是联接输出是通过类型为String的“ left =“ leftValue”,right =“ rightValue计算的.

来源:/content-1-519501.html

收集不易,本文《Java中如何使用Avro值将两个Kafka流结合并生成结果》知识如果对你有帮助,请点赞收藏并留下你的评论。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。
显示评论内容(5)
  1. L8迷茫者2024-03-02 10:52L8迷茫者[安徽省网友]110.75.37.18
    对于刚入门Java的我来说,这个功能看起来有点复杂,但我会努力学习并尝试使用。
    顶0踩0
  2. D区俊秀2024-03-02 10:47D区俊秀[江西省网友]150.242.51.79
    Java中操作Avro值确实可以简化处理流数据的过程,很棒的技巧。
    顶10踩0
  3. ζ我爱你爱你一心一意2024-03-02 10:41ζ我爱你爱你一心一意[广西网友]203.55.3.1
    这个功能对于我目前的项目来说非常适用,我会尽快尝试一下。
    顶7踩0
  4. 整理背包2024-03-02 10:36整理背包[江苏省网友]203.56.187.121
    我之前就困惑如何将两个Kafka流结合起来,现在终于有了解决方案,感谢分享。
    顶32踩0
  5. 旋死神•孤独忆•暗影徒2024-03-02 10:30旋死神•孤独忆•暗影徒[北京市网友]103.242.134.134
    Java中使用Avro值将两个Kafka流结合是一个很有用的功能,可以更方便地处理数据。
    顶1踩0
相关阅读
探讨尕字的发音和含义

探讨尕字的发音和含义

尕这个汉字很不常见,甚至很多人不知道这个字的存在,这个字就由小字和乃字组合而成,拆开知道怎么读,合在一起尕怎么读什么意思,接下来为你揭晓

2023-12-14 #文学

震夜求生:如何在睡梦中应对强震?

震夜求生:如何在睡梦中应对强震?

睡觉时遭遇强震如何自救?枕头始终保护着后脑,枕头两侧贴紧两只耳朵

2023-12-19 #推荐

如何有效灭除花盆里的小蜈蚣?蜈蚣对花有危害吗?

如何有效灭除花盆里的小蜈蚣?蜈蚣对花有危害吗?

比如花盆里面如果有出现小蜈蚣的话,虽然这种蜈蚣可以吃掉花卉植物的虫害,但是它也是会爬出来的还会咬到人,我们养花碰到的话要小心

2024-01-25 #知识

西门子洗碗机SJ456的排水性能如何?

西门子洗碗机SJ456的排水性能如何?

其强力马达和精密设计确保了水流的顺畅,有效防止管道堵塞和水渍残留

2024-01-30 #推荐

激发灵感 畅言情怀:理想激励我的作文精选

激发灵感 畅言情怀:理想激励我的作文精选

这三篇作文将深入探讨理想对青少年的启发和激励作用,讲述了作者在追寻理想道路上的艰辛与收获

2024-02-01 #随笔

重拾友谊:500字作文素材大全

重拾友谊:500字作文素材大全

友情的重要性在生活中不可言喻,作文中将涵盖友谊的定义、我如何失去友谊以及如何重新找回友谊等方面

2024-02-06 #随笔

探讨Tomcat安装及配置的软件开发攻略

探讨Tomcat安装及配置的软件开发攻略

Tomcat是一个开源的Web服务器和Java Servlet容器,广泛应用于Java开发环境中

2024-02-11 #百科

用心制作 美丽的星期天早餐美味又好吃

用心制作 美丽的星期天早餐美味又好吃

在这篇文章中,我们将分享如何制作美味的星期天早餐,为你的早晨带来一丝温馨和美好

2024-02-20 #生活

十字绣中常见问题与解决方法解析

十字绣中常见问题与解决方法解析

选线问题可能导致绣品色差,解决办法可以是在强光下比色,或使用专业的线圈

2024-02-21 #推荐