• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java KTable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.streams.kstream.KTable的典型用法代码示例。如果您正苦于以下问题:Java KTable类的具体用法?Java KTable怎么用?Java KTable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KTable类属于org.apache.kafka.streams.kstream包,在下文中一共展示了KTable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: doFilter

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
                              final StateStoreSupplier<KeyValueStore> storeSupplier,
                              boolean isFilterNot) {
    Objects.requireNonNull(predicate, "predicate can't be null");
    String name = topology.newName(FILTER_NAME);
    String internalStoreName = null;
    if (storeSupplier != null) {
        internalStoreName = storeSupplier.name();
    }
    KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, isFilterNot, internalStoreName);
    topology.addProcessor(name, processorSupplier, this.name);
    if (storeSupplier != null) {
        topology.addStateStore(storeSupplier, name);
    }
    return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KTableImpl.java


示例2: doMapValues

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                       final Serde<V1> valueSerde,
                                       final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(mapper);
    String name = topology.newName(MAPVALUES_NAME);
    String internalStoreName = null;
    if (storeSupplier != null) {
        internalStoreName = storeSupplier.name();
    }
    KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
    topology.addProcessor(name, processorSupplier, this.name);
    if (storeSupplier != null) {
        topology.addStateStore(storeSupplier, name);
        return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
    } else {
        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KTableImpl.java


示例3: joinTopology

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
public static KStreamBuilder joinTopology(KStreamBuilder builder) {
    KStream<String, Integer> kStreamA = builder.stream(stringSerde, integerSerde, INPUT_TOPIC_A);
    KStream<String, Integer> kStreamB = builder.stream(stringSerde, integerSerde, INPUT_TOPIC_B);

    KTable<String, Integer> table = kStreamA
        .groupByKey(stringSerde, integerSerde)
        .aggregate(() -> 0, (k, v, t) -> v, integerSerde, STORAGE_NAME);

    kStreamB
        .leftJoin(table, (v1, v2) -> v1 + v2, stringSerde, integerSerde)
        .to(stringSerde, integerSerde, OUTPUT_TOPIC_A);

    kStreamB
        .leftJoin(table, (v1, v2) -> v1 - v2, stringSerde, integerSerde)
        .to(stringSerde, integerSerde, OUTPUT_TOPIC_B);

    return builder;
}
 
开发者ID:carlosmenezes,项目名称:mockafka,代码行数:19,代码来源:TopologyUtil.java


示例4: aggregate

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                              final Aggregator<? super K, ? super V, T> aggregator,
                                                              final Windows<W> windows,
                                                              final StateStoreSupplier<WindowStore> storeSupplier) {
    Objects.requireNonNull(initializer, "initializer can't be null");
    Objects.requireNonNull(aggregator, "aggregator can't be null");
    Objects.requireNonNull(windows, "windows can't be null");
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    return (KTable<Windowed<K>, T>) doAggregate(
            new KStreamWindowAggregate<>(windows, storeSupplier.name(), initializer, aggregator),
            AGGREGATE_NAME,
            storeSupplier
    );
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:17,代码来源:KGroupedStreamImpl.java


示例5: doAggregate

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private <T> KTable<K, T> doAggregate(
        final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
        final String functionName,
        final StateStoreSupplier storeSupplier) {

    final String aggFunctionName = topology.newName(functionName);

    final String sourceName = repartitionIfRequired(storeSupplier.name());

    topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
    topology.addStateStore(storeSupplier, aggFunctionName);

    return new KTableImpl<>(topology,
            aggFunctionName,
            aggregateSupplier,
            sourceName.equals(this.name) ? sourceNodes
                    : Collections.singleton(sourceName),
            storeSupplier.name(),
            isQueryable);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImpl.java


示例6: join

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private KTable<String, String> join(final KTable<String, String> first,
                                    final KTable<String, String> second,
                                    final JoinType joinType,
                                    final String queryableName) {
    final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
        @Override
        public String apply(final String value1, final String value2) {
            return value1 + "-" + value2;
        }
    };

    switch (joinType) {
        case INNER:
            return first.join(second, joiner, Serdes.String(), queryableName);
        case LEFT:
            return first.leftJoin(second, joiner, Serdes.String(), queryableName);
        case OUTER:
            return first.outerJoin(second, joiner, Serdes.String(), queryableName);
    }

    throw new RuntimeException("Unknown join type.");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:23,代码来源:KTableKTableJoinIntegrationTest.java


示例7: testJoin

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testJoin() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();

    final int[] expectedKeys = new int[]{0, 1, 2, 3};

    final KTable<Integer, String> table1;
    final KTable<Integer, String> table2;
    final KTable<Integer, String> joined;
    final MockProcessorSupplier<Integer, String> processor;

    processor = new MockProcessorSupplier<>();
    table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
    table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
    joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
    joined.toStream().process(processor);

    doTestJoin(builder, expectedKeys, processor, joined);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KTableKTableJoinTest.java


示例8: testNotSendingOldValues

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testNotSendingOldValues() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();

    final int[] expectedKeys = new int[]{0, 1, 2, 3};

    final KTable<Integer, String> table1;
    final KTable<Integer, String> table2;
    final KTable<Integer, String> joined;
    final MockProcessorSupplier<Integer, String> proc;

    table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
    table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
    joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
    proc = new MockProcessorSupplier<>();
    builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);

    doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);

}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KTableKTableJoinTest.java


示例9: testQueryableNotSendingOldValues

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testQueryableNotSendingOldValues() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();

    final int[] expectedKeys = new int[]{0, 1, 2, 3};

    final KTable<Integer, String> table1;
    final KTable<Integer, String> table2;
    final KTable<Integer, String> joined;
    final MockProcessorSupplier<Integer, String> proc;

    table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
    table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
    joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
    proc = new MockProcessorSupplier<>();
    builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);

    doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);

}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KTableKTableJoinTest.java


示例10: testSendingOldValues

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testSendingOldValues() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();

    final int[] expectedKeys = new int[]{0, 1, 2, 3};

    final KTable<Integer, String> table1;
    final KTable<Integer, String> table2;
    final KTable<Integer, String> joined;
    final MockProcessorSupplier<Integer, String> proc;

    table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
    table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
    joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);

    proc = new MockProcessorSupplier<>();
    builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);

    doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true);

}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KTableKTableJoinTest.java


示例11: testKTable

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testKTable() {
    final KStreamBuilder builder = new KStreamBuilder();

    String topic1 = "topic1";

    KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");

    MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
    table1.toStream().process(proc1);

    driver = new KStreamTestDriver(builder, stateDir);
    driver.process(topic1, "A", 1);
    driver.process(topic1, "B", 2);
    driver.process(topic1, "C", 3);
    driver.process(topic1, "D", 4);
    driver.flushState();
    driver.process(topic1, "A", null);
    driver.process(topic1, "B", null);
    driver.flushState();

    assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:24,代码来源:KTableSourceTest.java


示例12: testKTable

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testKTable() {
    final KStreamBuilder builder = new KStreamBuilder();

    String topic1 = "topic1";

    KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
    KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
        @Override
        public Integer apply(CharSequence value) {
            return value.charAt(0) - 48;
        }
    });

    MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
    table2.toStream().process(proc2);

    doTestKTable(builder, topic1, proc2);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KTableMapValuesTest.java


示例13: testQueryableKTable

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void testQueryableKTable() {
    final KStreamBuilder builder = new KStreamBuilder();

    String topic1 = "topic1";

    KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
    KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
        @Override
        public Integer apply(CharSequence value) {
            return value.charAt(0) - 48;
        }
    }, Serdes.Integer(), "anyName");

    MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
    table2.toStream().process(proc2);

    doTestKTable(builder, topic1, proc2);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KTableMapValuesTest.java


示例14: shouldCountSessionWindowsWithInternalStoreName

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
    final Map<Windowed<String>, Long> results = new HashMap<>();
    KTable table = groupedStream.count(SessionWindows.with(30));
    table.foreach(new ForeachAction<Windowed<String>, Long>() {
        @Override
        public void apply(final Windowed<String> key, final Long value) {
            results.put(key, value);
        }
    });
    doCountSessionWindows(results);
    assertNull(table.queryableStoreName());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KGroupedStreamImplTest.java


示例15: shouldReduceSessionWindows

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindows() throws Exception {
    final Map<Windowed<String>, String> results = new HashMap<>();
    KTable table = groupedStream.reduce(
            new Reducer<String>() {
                @Override
                public String apply(final String value1, final String value2) {
                    return value1 + ":" + value2;
                }
            }, SessionWindows.with(30),
            "session-store");
    table.foreach(new ForeachAction<Windowed<String>, String>() {
        @Override
        public void apply(final Windowed<String> key, final String value) {
            results.put(key, value);
        }
    });
    doReduceSessionWindows(results);
    assertEquals(table.queryableStoreName(), "session-store");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KGroupedStreamImplTest.java


示例16: shouldReduceSessionWindowsWithInternalStoreName

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
    final Map<Windowed<String>, String> results = new HashMap<>();
    KTable table = groupedStream.reduce(
            new Reducer<String>() {
                @Override
                public String apply(final String value1, final String value2) {
                    return value1 + ":" + value2;
                }
            }, SessionWindows.with(30));
    table.foreach(new ForeachAction<Windowed<String>, String>() {
        @Override
        public void apply(final Windowed<String> key, final String value) {
            results.put(key, value);
        }
    });
    doReduceSessionWindows(results);
    assertNull(table.queryableStoreName());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KGroupedStreamImplTest.java


示例17: shouldReduce

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduce() throws Exception {
    final String topic = "input";
    final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
        new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
            @Override
            public KeyValue<String, Integer> apply(String key, Number value) {
                return KeyValue.pair(key, value.intValue());
            }
        };

    final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
        .groupBy(intProjection)
        .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");

    doShouldReduce(reduced, topic);
    assertEquals(reduced.queryableStoreName(), "reduced");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KGroupedTableImplTest.java


示例18: shouldReduceWithInternalStoreName

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Test
public void shouldReduceWithInternalStoreName() throws Exception {
    final String topic = "input";
    final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
        new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
            @Override
            public KeyValue<String, Integer> apply(String key, Number value) {
                return KeyValue.pair(key, value.intValue());
            }
        };

    final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
        .groupBy(intProjection)
        .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);

    doShouldReduce(reduced, topic);
    assertNull(reduced.queryableStoreName());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:19,代码来源:KGroupedTableImplTest.java


示例19: doTestKTable

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
private void doTestKTable(final KStreamBuilder builder, final KTable<String, Integer> table2,
                          final KTable<String, Integer> table3, final String topic1) {
    MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
    MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
    table2.toStream().process(proc2);
    table3.toStream().process(proc3);

    driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());

    driver.process(topic1, "A", 1);
    driver.process(topic1, "B", 2);
    driver.process(topic1, "C", 3);
    driver.process(topic1, "D", 4);
    driver.flushState();
    driver.process(topic1, "A", null);
    driver.process(topic1, "B", null);
    driver.flushState();

    proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
    proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:22,代码来源:KTableFilterTest.java


示例20: join

import org.apache.kafka.streams.kstream.KTable; //导入依赖的package包/类
@Override
public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
                                  final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                  final Serde<K> keySerde,
                                  final Serde<V> valueSerde) {
    if (repartitionRequired) {
        final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde,
            valueSerde, null);
        return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
    } else {
        return doStreamTableJoin(other, joiner, false);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:14,代码来源:KStreamImpl.java



注:本文中的org.apache.kafka.streams.kstream.KTable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Runtime类代码示例发布时间:2022-05-21
下一篇:
Java AccelSurface类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap