requestsStream =
+ consumer.subscribe(
+ new StreamObserver<>() {
+ @Override
+ public void onNext(SubscriptionStreamResponse response) {
+ response.getNotifications().getNotificationsList().stream()
+ .map(
+ n -> {
+ try {
+ return MAPPER.readValue(
+ n.getMessage().getPayload().toByteArray(), User.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .forEach(resultAcceptor);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ throw new RuntimeException("Stream observer received error", t);
+ }
+
+ @Override
+ public void onCompleted() {}
+ });
+ requestsStream.onNext(
+ SubscriptionStreamRequest.newBuilder()
+ .setSubscribeRequest(SubscriptionRequest.newBuilder().setCursor("").setQueue(queue))
+ .build());
+ }
+}
diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClient.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClient.java
new file mode 100644
index 00000000..4a744846
--- /dev/null
+++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClient.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
+ * All Rights Reserved.
+ */
+
+package org.testcontainers.containers.integration.tqe;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.testcontainers.containers.utils.pojo.User;
+
+/**
+ * Version-specific gRPC client for publishing and subscribing to a TQE queue. TQE 2.x uses {@code
+ * PublisherServiceGrpc} + unidirectional streaming; TQE 3.x uses {@code ProducerGrpc} +
+ * bidirectional streaming.
+ *
+ * The gRPC channel is bound at construction time and is intentionally not part of the method
+ * signatures: it is an internal transport detail, not part of the client contract.
+ */
+interface TQEClient {
+
+ /**
+ * Publishes a batch of users to the given queue. Synchronous: throws on failure.
+ *
+ * @param users the messages to publish
+ * @param queue the target queue name
+ */
+ void publish(List users, String queue) throws Exception;
+
+ /**
+ * Starts a subscription on the given queue. Messages are delivered asynchronously and forwarded
+ * to {@code resultAcceptor} as they arrive. This call only kicks off the subscription; callers
+ * should retry/poll until enough messages have been received.
+ *
+ * @param queue the queue to subscribe to
+ * @param resultAcceptor callback invoked once per received message
+ */
+ void subscribe(String queue, Consumer resultAcceptor);
+}
diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterFixture.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterFixture.java
new file mode 100644
index 00000000..646a6382
--- /dev/null
+++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterFixture.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
+ * All Rights Reserved.
+ */
+
+package org.testcontainers.containers.integration.tqe;
+
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.ManagedChannel;
+import org.testcontainers.containers.tqe.GrpcContainer;
+import org.testcontainers.containers.tqe.GrpcContainer.GrpcRole;
+import org.testcontainers.containers.tqe.TQECluster;
+import org.testcontainers.containers.tqe.TQEClusterImpl;
+import org.testcontainers.containers.tqe.configuration.TQEConfigurator;
+
+/**
+ * Encapsulates the lifecycle of a {@link TQECluster} for a single test: builds the configurator,
+ * creates the cluster, starts it, resolves gRPC containers by role, and exposes pre-bound {@link
+ * TQEClient} instances. {@link #close()} stops the cluster and shuts down all gRPC channels.
+ */
+final class TQEClusterFixture implements AutoCloseable {
+
+ private final TQEVersion version;
+ private final TQEConfigurator configurator;
+ private final TQECluster cluster;
+ private final ManagedChannel publisherChannel;
+ private final ManagedChannel consumerChannel;
+ private final TQEClient publisherClient;
+ private final TQEClient consumerClient;
+
+ TQEClusterFixture(TQEVersion version) {
+ this.version = version;
+ this.configurator =
+ version.configuratorBuilder(version.queueConfig(), Set.of(version.grpcConfig())).build();
+ this.cluster = new TQEClusterImpl(configurator);
+ this.cluster.start();
+ this.publisherChannel = createReadyChannel(findByRole(version.producerRole()));
+ this.consumerChannel = createReadyChannel(findByRole(GrpcRole.CONSUMER));
+ this.publisherClient = version.client(publisherChannel);
+ this.consumerClient = version.client(consumerChannel);
+ }
+
+ TQEVersion version() {
+ return version;
+ }
+
+ TQEClient publisherClient() {
+ return publisherClient;
+ }
+
+ TQEClient consumerClient() {
+ return consumerClient;
+ }
+
+ void restart(long delayBefore, TimeUnit unitBefore, long delayAfter, TimeUnit unitAfter)
+ throws InterruptedException {
+ this.cluster.restart(delayBefore, unitBefore, delayAfter, unitAfter);
+ }
+
+ @Override
+ public void close() {
+ try {
+ publisherChannel.shutdownNow();
+ consumerChannel.shutdownNow();
+ } finally {
+ this.cluster.stop();
+ }
+ }
+
+ private GrpcContainer> findByRole(GrpcRole role) {
+ return this.cluster.grpc().values().stream()
+ .filter(g -> g.roles().contains(role))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "No gRPC container with role "
+ + role
+ + " in cluster "
+ + cluster.clusterName()));
+ }
+
+ private static ManagedChannel createReadyChannel(GrpcContainer> grpc) {
+ InetSocketAddress address =
+ grpc.grpcAddresses().stream()
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalStateException("No gRPC address on container " + grpc.node()));
+ return TQETestHelper.createReadyChannel(address);
+ }
+}
diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java
deleted file mode 100644
index 7633380c..00000000
--- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java
+++ /dev/null
@@ -1,467 +0,0 @@
-/*
- * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
- * All Rights Reserved.
- */
-
-package org.testcontainers.containers.integration.tqe;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-import com.google.protobuf.ByteString;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.stub.StreamObserver;
-import org.instancio.Instancio;
-import org.instancio.Select;
-import org.instancio.generators.Generators;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.rnorth.ducttape.unreliables.Unreliables;
-import org.testcontainers.containers.ContainerLaunchException;
-import org.testcontainers.containers.tqe.GrpcContainer;
-import org.testcontainers.containers.tqe.GrpcContainer.GrpcRole;
-import org.testcontainers.containers.tqe.TQECluster;
-import org.testcontainers.containers.tqe.TQEClusterImpl;
-import org.testcontainers.containers.tqe.configuration.FileTQEConfigurator;
-import org.testcontainers.containers.tqe.configuration.TQEConfigurator;
-import org.testcontainers.containers.utils.pojo.User;
-import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
-import tarantool.queue_ee.Consumer.SubscriptionRequest;
-import tarantool.queue_ee.Consumer.SubscriptionStreamRequest;
-import tarantool.queue_ee.Consumer.SubscriptionStreamResponse;
-import tarantool.queue_ee.ConsumerServiceGrpc;
-import tarantool.queue_ee.ConsumerServiceGrpc.ConsumerServiceStub;
-import tarantool.queue_ee.ProducerGrpc;
-import tarantool.queue_ee.ProducerGrpc.ProducerBlockingStub;
-import tarantool.queue_ee.ProducerOuterClass.ProduceMessage;
-import tarantool.queue_ee.ProducerOuterClass.ProduceRequest;
-
-class TQEClusterImplTest extends CommonTest {
-
- @RepeatedTest(10)
- void testMultiplyRestart() throws Exception {
- try (TQEConfigurator configurator =
- FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG))
- .build();
- TQECluster cluster = new TQEClusterImpl(configurator)) {
- cluster.start();
- }
- }
-
- @Test
- void testRestartMethod() throws Exception {
- try (TQEConfigurator configurator =
- FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG))
- .build();
- TQECluster cluster = new TQEClusterImpl(configurator)) {
- cluster.start();
- cluster.restart(1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
- }
- }
-
- public static Stream dataForTestInvalidQueueConfigShouldThrow() {
- final List invalidConfigs =
- Arrays.asList(
- // no required test-super user
- """
- # Credentials
- credentials:
- users:
- admin:
- password: 'secret-cluster-cookie'
- roles: [ super ]
- replicator:
- password: 'secret'
- roles: [ replication ]
- storage:
- roles: [ sharding ]
- password: storage
-
- # advertise configs for all nodes
- iproto:
- advertise:
- peer:
- login: replicator
- sharding:
- login: storage
- password: storage
-
- roles: [ roles.metrics-export ]
- # queues configs
- roles_cfg:
- app.roles.queue:
- queues:
- - name: test
- deduplication_mode: keep_latest
- disabled_filters_by: [ sharding_key ]
- roles.metrics-export:
- http:
- - listen: 8081
- endpoints:
- - format: prometheus
- path: '/metrics'
-
- groups:
- routers:
- replicasets:
- r-1:
- sharding:
- roles: [ router ]
- roles: [ app.roles.api ]
- instances:
- router:
- iproto:
- listen:
- - uri: router:3301
- storages:
- replicasets:
- shard-1:
- replication:
- failover: manual
- sharding:
- roles: [ storage ]
- leader: master
- instances:
- master:
- iproto:
- listen:
- - uri: master:3301
- net_msg_max: 768
- """,
- // no consumer storage to connect from grpc
- """
- # Credentials
- credentials:
- users:
- test-super:
- password: 'test'
- roles: [ super ]
- admin:
- password: 'secret-cluster-cookie'
- roles: [ super ]
- replicator:
- password: 'secret'
- roles: [ replication ]
- storage:
- roles: [ sharding ]
- password: storage
-
- # advertise configs for all nodes
- iproto:
- advertise:
- peer:
- login: replicator
- sharding:
- login: storage
- password: storage
-
- roles: [ roles.metrics-export ]
- # queues configs
- roles_cfg:
- app.roles.queue:
- queues:
- - name: test
- deduplication_mode: keep_latest
- disabled_filters_by: [ sharding_key ]
- roles.metrics-export:
- http:
- - listen: 8081
- endpoints:
- - format: prometheus
- path: '/metrics'
-
- groups:
- routers:
- replicasets:
- r-1:
- sharding:
- roles: [ router ]
- roles: [ app.roles.api ]
- instances:
- router:
- iproto:
- listen:
- - uri: router:3301
- """);
-
- return invalidConfigs.stream()
- .map(
- s -> {
- final Path testConfigPath = TEST_TEMP_DIR.resolve(UUID.randomUUID().toString());
- try {
- Files.writeString(testConfigPath, s);
- return Arguments.of(testConfigPath);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- @ParameterizedTest
- @MethodSource("dataForTestInvalidQueueConfigShouldThrow")
- void testInvalidQueueConfigShouldThrow(Path queueConfig) {
- Assertions.assertThrows(
- ContainerLaunchException.class,
- () -> {
- try (TQEConfigurator configurator =
- FileTQEConfigurator.builder(IMAGE_NAME, queueConfig, Set.of(SIMPLE_GRPC_CONFIG))
- .withStartupTimeout(Duration.ofSeconds(5))
- .build();
- TQECluster cluster = new TQEClusterImpl(configurator)) {
- cluster.start();
- }
- });
- }
-
- public static Stream dataForTestInvalidGrpcConfig() {
- final List invalidGrpcConfigs =
- Arrays.asList(
- """
- core_port: 1111
- grpc_listen:
- - uri: 'tcp://0.0.0.0:18182'
-
- producer:
- enabled: true
- tarantool:
- user: test-super
- pass: test
- connections:
- routers:
- - "unknown:3301"
-
- consumer:
- enabled: true
- tarantool:
- user: test-super
- pass: test
- connections:
- storage:
- - "master:3301"
- """,
- // no consumers and producers
- """
- core_port: 1111
- grpc_listen:
- - uri: 'tcp://0.0.0.0:18182'
-
- producer:
- enabled: false
- tarantool:
- user: test-super
- pass: test
- connections:
- routers:
- - "router:3301"
-
- consumer:
- enabled: false
- tarantool:
- user: test-super
- pass: test
- connections:
- storage:
- - "master:3301"
- """,
- // no core_port parameter
- """
- grpc_listen:
- - uri: 'tcp://0.0.0.0:18182'
-
- producer:
- enabled: true
- tarantool:
- user: test-super
- pass: test
- connections:
- routers:
- - "router:3301"
-
- consumer:
- enabled: true
- tarantool:
- user: test-super
- pass: test
- connections:
- storage:
- - "master:3301"
- """,
- // no listen.uri parameter
- """
- core_port: 1111
-
- producer:
- enabled: true
- tarantool:
- user: test-super
- pass: test
- connections:
- routers:
- - "router:3301"
-
- consumer:
- enabled: true
- tarantool:
- user: test-super
- pass: test
- connections:
- storage:
- - "master:3301"
- """);
-
- return invalidGrpcConfigs.stream()
- .map(
- s -> {
- final Path testConfigPath = TEST_TEMP_DIR.resolve(UUID.randomUUID() + ".yml");
- try {
- Files.writeString(testConfigPath, s);
- return testConfigPath;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- @ParameterizedTest
- @MethodSource("dataForTestInvalidGrpcConfig")
- void testInvalidGrpcConfig(Path grpcConfig) {
- Assertions.assertThrows(
- ContainerLaunchException.class,
- () -> {
- try (TQEConfigurator configurator =
- FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(grpcConfig))
- .withStartupTimeout(Duration.ofSeconds(5))
- .build();
- TQECluster cluster = new TQEClusterImpl(configurator)) {
- cluster.start();
- }
- });
- }
-
- @RepeatedTest(10)
- void testPublishAndConsumeData() {
- Assertions.assertDoesNotThrow(
- () -> {
- final ObjectMapper MAPPER = new ObjectMapper();
-
- try (TQEConfigurator configurator =
- FileTQEConfigurator.builder(
- IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG))
- .build();
- TQECluster cluster = new TQEClusterImpl(configurator)) {
- cluster.start();
-
- final String queueName = "test";
-
- final List> producers =
- cluster.grpc().values().stream()
- .filter(g -> g.roles().contains(GrpcRole.PRODUCER))
- .toList();
- final List> consumers =
- cluster.grpc().values().stream()
- .filter(g -> g.roles().contains(GrpcRole.CONSUMER))
- .toList();
-
- Assertions.assertFalse(producers.isEmpty());
- Assertions.assertFalse(consumers.isEmpty());
-
- final Set grpcAddresses = producers.get(0).grpcAddresses();
- final Set consumerAddresses = consumers.get(0).grpcAddresses();
-
- final Optional publisherAddress = grpcAddresses.stream().findFirst();
- Assertions.assertTrue(publisherAddress.isPresent());
- final Optional consumerAddress =
- consumerAddresses.stream().findFirst();
- Assertions.assertTrue(consumerAddress.isPresent());
-
- final ManagedChannel producerChannel =
- ManagedChannelBuilder.forAddress(
- publisherAddress.get().getHostName(), publisherAddress.get().getPort())
- .usePlaintext()
- .build();
-
- final ManagedChannel consumerChannel =
- ManagedChannelBuilder.forAddress(
- consumerAddress.get().getHostName(), consumerAddress.get().getPort())
- .usePlaintext()
- .build();
-
- final ProducerBlockingStub producer = ProducerGrpc.newBlockingStub(producerChannel);
- final ConsumerServiceStub consumer = ConsumerServiceGrpc.newStub(consumerChannel);
-
- final List users =
- Instancio.ofList(User.class)
- .size(100)
- .generate(
- Select.field(User::getName),
- g -> g.string().alphaNumeric().allowEmpty().nullable())
- .generate(Select.field(User::getAge), Generators::ints)
- .create();
-
- final ProduceRequest.Builder requestBuilder =
- ProduceRequest.newBuilder().setQueue(queueName);
- for (User user : users) {
- requestBuilder.addMessages(
- ProduceMessage.newBuilder()
- .setPayload(ByteString.copyFrom(MAPPER.writeValueAsBytes(user))));
- }
- final ProduceRequest produceRequest = requestBuilder.build();
- producer.produce(produceRequest);
-
- final Set result = new CopyOnWriteArraySet<>();
- StreamObserver requestsStream =
- consumer.subscribe(
- new StreamObserver() {
- @Override
- public void onNext(SubscriptionStreamResponse response) {
- response.getNotifications().getNotificationsList().stream()
- .map(
- n -> {
- try {
- return MAPPER.readValue(
- n.getMessage().getPayload().toByteArray(), User.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .forEach(result::add);
- }
-
- @Override
- public void onError(Throwable t) {}
-
- @Override
- public void onCompleted() {}
- });
- requestsStream.onNext(
- SubscriptionStreamRequest.newBuilder()
- .setSubscribeRequest(
- SubscriptionRequest.newBuilder().setCursor("").setQueue(queueName))
- .build());
-
- Unreliables.retryUntilTrue(
- 5, TimeUnit.SECONDS, () -> new LinkedHashSet<>(users).size() == result.size());
- Assertions.assertEquals(new LinkedHashSet<>(users), result);
- consumerChannel.shutdownNow();
- producerChannel.shutdownNow();
- }
- });
- }
-}
diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java
new file mode 100644
index 00000000..3570e4ea
--- /dev/null
+++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterIntegrationTest.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
+ * All Rights Reserved.
+ */
+
+package org.testcontainers.containers.integration.tqe;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import org.instancio.Instancio;
+import org.instancio.Select;
+import org.instancio.generators.Generators;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.utils.pojo.User;
+
+class TQEClusterIntegrationTest {
+
+ private static final String QUEUE_NAME = "test";
+ private static final int USERS_COUNT = 100;
+ private static final int RETRY_TIMEOUT_SECONDS = 60;
+
+ @ParameterizedTest
+ @EnumSource(TQEVersion.class)
+ @DisplayName("publish → subscribe round-trip across TQE versions")
+ void testPublishAndConsumeData(TQEVersion version) {
+ Assertions.assertDoesNotThrow(
+ () -> {
+ try (TQEClusterFixture fx = new TQEClusterFixture(version)) {
+ final List users = generateUsers();
+
+ Unreliables.retryUntilSuccess(
+ RETRY_TIMEOUT_SECONDS,
+ TimeUnit.SECONDS,
+ () -> {
+ fx.publisherClient().publish(users, QUEUE_NAME);
+ return true;
+ });
+
+ final Set result = new CopyOnWriteArraySet<>();
+ Unreliables.retryUntilSuccess(
+ RETRY_TIMEOUT_SECONDS,
+ TimeUnit.SECONDS,
+ () -> {
+ fx.consumerClient().subscribe(QUEUE_NAME, result::add);
+ return true;
+ });
+
+ Unreliables.retryUntilTrue(
+ RETRY_TIMEOUT_SECONDS, TimeUnit.SECONDS, () -> users.size() == result.size());
+ Assertions.assertEquals(new LinkedHashSet<>(users), result);
+ }
+ });
+ }
+
+ private static List generateUsers() {
+ return Instancio.ofList(User.class)
+ .size(USERS_COUNT)
+ .generate(
+ Select.field(User::getName), g -> g.string().alphaNumeric().allowEmpty().nullable())
+ .generate(Select.field(User::getAge), Generators::ints)
+ .create();
+ }
+}
diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterTest.java
new file mode 100644
index 00000000..d7b25534
--- /dev/null
+++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterTest.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
+ * All Rights Reserved.
+ */
+
+package org.testcontainers.containers.integration.tqe;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.tqe.TQECluster;
+import org.testcontainers.containers.tqe.TQEClusterImpl;
+import org.testcontainers.containers.tqe.configuration.TQEConfigurator;
+
+class TQEClusterTest {
+
+ @ParameterizedTest
+ @EnumSource(TQEVersion.class)
+ void testStartupAndShutdown(TQEVersion version) {
+ try (TQEClusterFixture fx = new TQEClusterFixture(version)) {
+ // Fixture starts the cluster in its constructor; close() stops it.
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(TQEVersion.class)
+ void testRestartMethod(TQEVersion version) throws Exception {
+ try (TQEClusterFixture fx = new TQEClusterFixture(version)) {
+ fx.restart(1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
+ }
+ }
+
+ public static Stream dataForTestInvalidQueueConfig() {
+ final List invalidConfigs =
+ Arrays.asList(
+ // no required test-super user
+ """
+ # Credentials
+ credentials:
+ users:
+ admin:
+ password: 'secret-cluster-cookie'
+ roles: [ super ]
+ replicator:
+ password: 'secret'
+ roles: [ replication ]
+ storage:
+ roles: [ sharding ]
+ password: storage
+
+ # advertise configs for all nodes
+ iproto:
+ advertise:
+ peer:
+ login: replicator
+ sharding:
+ login: storage
+ password: storage
+
+ roles: [ roles.metrics-export ]
+ # queues configs
+ roles_cfg:
+ app.roles.queue:
+ queues:
+ - name: test
+ deduplication_mode: keep_latest
+ disabled_filters_by: [ sharding_key ]
+ roles.metrics-export:
+ http:
+ - listen: 8081
+ endpoints:
+ - format: prometheus
+ path: '/metrics'
+
+ groups:
+ routers:
+ replicasets:
+ r-1:
+ sharding:
+ roles: [ router ]
+ roles: [ app.roles.api ]
+ instances:
+ router:
+ iproto:
+ listen:
+ - uri: router:3301
+ storages:
+ replicasets:
+ shard-1:
+ replication:
+ failover: manual
+ sharding:
+ roles: [ storage ]
+ leader: master
+ instances:
+ master:
+ iproto:
+ listen:
+ - uri: master:3301
+ net_msg_max: 768
+ """,
+ // no consumer storage to connect from grpc
+ """
+ # Credentials
+ credentials:
+ users:
+ test-super:
+ password: 'test'
+ roles: [ super ]
+ admin:
+ password: 'secret-cluster-cookie'
+ roles: [ super ]
+ replicator:
+ password: 'secret'
+ roles: [ replication ]
+ storage:
+ roles: [ sharding ]
+ password: storage
+
+ # advertise configs for all nodes
+ iproto:
+ advertise:
+ peer:
+ login: replicator
+ sharding:
+ login: storage
+ password: storage
+
+ roles: [ roles.metrics-export ]
+ # queues configs
+ roles_cfg:
+ app.roles.queue:
+ queues:
+ - name: test
+ deduplication_mode: keep_latest
+ disabled_filters_by: [ sharding_key ]
+ roles.metrics-export:
+ http:
+ - listen: 8081
+ endpoints:
+ - format: prometheus
+ path: '/metrics'
+
+ groups:
+ routers:
+ replicasets:
+ r-1:
+ sharding:
+ roles: [ router ]
+ roles: [ app.roles.api ]
+ instances:
+ router:
+ iproto:
+ listen:
+ - uri: router:3301
+ """);
+
+ return TQEVersion.all()
+ .flatMap(
+ version ->
+ invalidConfigs.stream()
+ .map(
+ s -> {
+ final Path testConfigPath =
+ TQETestHelper.TEST_TEMP_DIR.resolve(UUID.randomUUID().toString());
+ try {
+ Files.writeString(testConfigPath, s);
+ return Arguments.of(version, testConfigPath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ @ParameterizedTest
+ @MethodSource("dataForTestInvalidQueueConfig")
+ void testInvalidQueueConfig(TQEVersion version, Path queueConfig) {
+ Assertions.assertThrows(
+ ContainerLaunchException.class,
+ () -> {
+ try (TQEConfigurator configurator =
+ version
+ .configuratorBuilder(queueConfig, Set.of(version.grpcConfig()))
+ .withStartupTimeout(Duration.ofSeconds(5))
+ .build();
+ TQECluster cluster = new TQEClusterImpl(configurator)) {
+ cluster.start();
+ }
+ });
+ }
+
+ public static Stream dataForTestInvalidGrpcConfig() {
+ return TQEVersion.all()
+ .flatMap(
+ version -> {
+ final List invalidGrpcConfigs =
+ Arrays.asList(
+ // unknown host
+ """
+ core_port: 1111
+ grpc_listen:
+ - uri: 'tcp://0.0.0.0:18182'
+
+ %s:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ routers:
+ - "unknown:3301"
+
+ consumer:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ storage:
+ - "master:3301"
+ """
+ .formatted(version.producerRoleName()),
+ // no consumers and producers
+ """
+ core_port: 1111
+ grpc_listen:
+ - uri: 'tcp://0.0.0.0:18182'
+
+ %s:
+ enabled: false
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ routers:
+ - "router:3301"
+
+ consumer:
+ enabled: false
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ storage:
+ - "master:3301"
+ """
+ .formatted(version.producerRoleName()),
+ // no core_port parameter
+ """
+ grpc_listen:
+ - uri: 'tcp://0.0.0.0:18182'
+
+ %s:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ routers:
+ - "router:3301"
+
+ consumer:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ storage:
+ - "master:3301"
+ """
+ .formatted(version.producerRoleName()),
+ // no listen.uri parameter
+ """
+ core_port: 1111
+
+ %s:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ routers:
+ - "router:3301"
+
+ consumer:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ storage:
+ - "master:3301"
+ """
+ .formatted(version.producerRoleName()));
+
+ return invalidGrpcConfigs.stream()
+ .map(
+ s -> {
+ final Path testConfigPath =
+ TQETestHelper.TEST_TEMP_DIR.resolve(UUID.randomUUID() + ".yml");
+ try {
+ Files.writeString(testConfigPath, s);
+ return Arguments.of(version, testConfigPath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("dataForTestInvalidGrpcConfig")
+ void testInvalidGrpcConfig(TQEVersion version, Path grpcConfig) {
+ Assertions.assertThrows(
+ ContainerLaunchException.class,
+ () -> {
+ try (TQEConfigurator configurator =
+ version
+ .configuratorBuilder(version.queueConfig(), Set.of(grpcConfig))
+ .withStartupTimeout(Duration.ofSeconds(5))
+ .build();
+ TQECluster cluster = new TQEClusterImpl(configurator)) {
+ cluster.start();
+ }
+ });
+ }
+}
diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQETestHelper.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQETestHelper.java
new file mode 100644
index 00000000..88b96e5e
--- /dev/null
+++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQETestHelper.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
+ * All Rights Reserved.
+ */
+
+package org.testcontainers.containers.integration.tqe;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+
+final class TQETestHelper {
+
+ static final Path TEST_TEMP_DIR;
+
+ static {
+ try {
+ TEST_TEMP_DIR = Files.createTempDirectory("tqe-test-");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TQETestHelper() {}
+
+ static Path loadConfig(String resourcePath) {
+ try {
+ return Paths.get(
+ Objects.requireNonNull(TQETestHelper.class.getClassLoader().getResource(resourcePath))
+ .toURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static ManagedChannel createReadyChannel(InetSocketAddress address) {
+ return Unreliables.retryUntilSuccess(
+ 60,
+ TimeUnit.SECONDS,
+ () -> {
+ ManagedChannel ch =
+ ManagedChannelBuilder.forAddress(address.getHostName(), address.getPort())
+ .usePlaintext()
+ .maxInboundMessageSize(16 * 1024 * 1024)
+ .keepAliveTime(30, TimeUnit.SECONDS)
+ .keepAliveTimeout(5, TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true)
+ .build();
+
+ ch.getState(true);
+ Unreliables.retryUntilTrue(
+ 5,
+ TimeUnit.SECONDS,
+ () -> {
+ io.grpc.ConnectivityState state = ch.getState(false);
+ if (state == io.grpc.ConnectivityState.READY) {
+ return true;
+ }
+ ch.resetConnectBackoff();
+ Thread.sleep(100);
+ return false;
+ });
+ return ch;
+ });
+ }
+}
diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEVersion.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEVersion.java
new file mode 100644
index 00000000..fadec3b5
--- /dev/null
+++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEVersion.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
+ * All Rights Reserved.
+ */
+
+package org.testcontainers.containers.integration.tqe;
+
+import java.nio.file.Path;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import io.grpc.ManagedChannel;
+import org.testcontainers.containers.tqe.GrpcContainer.GrpcRole;
+import org.testcontainers.containers.tqe.configuration.FileTQEConfigurator;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Encapsulates all version-specific aspects of a TQE test: image, configs, gRPC role names, builder
+ * factory, and the gRPC client.
+ *
+ * Adding a new TQE version (e.g. TQE 4.x) is an Open/Closed-friendly change: add a new constant,
+ * override the abstract methods — no test code needs to change.
+ */
+enum TQEVersion {
+ TQE2("TQE 2.x", "publisher", GrpcRole.PRODUCER) {
+ @Override
+ public DockerImageName imageName() {
+ return IMAGE_TQE2;
+ }
+
+ @Override
+ public Path queueConfig() {
+ return QUEUE_CONFIG_TQE2;
+ }
+
+ @Override
+ public Path grpcConfig() {
+ return GRPC_CONFIG_TQE2;
+ }
+
+ @Override
+ public FileTQEConfigurator.Builder configuratorBuilder(Path queue, Set grpc) {
+ return FileTQEConfigurator.tqe2Builder(imageName(), queue, grpc);
+ }
+
+ @Override
+ public TQEClient client(ManagedChannel channel) {
+ return new TQE2Client(channel);
+ }
+ },
+
+ TQE3("TQE 3.x", "producer", GrpcRole.PRODUCER) {
+ @Override
+ public DockerImageName imageName() {
+ return IMAGE_TQE3;
+ }
+
+ @Override
+ public Path queueConfig() {
+ return QUEUE_CONFIG_TQE3;
+ }
+
+ @Override
+ public Path grpcConfig() {
+ return GRPC_CONFIG_TQE3;
+ }
+
+ @Override
+ public FileTQEConfigurator.Builder configuratorBuilder(Path queue, Set grpc) {
+ return FileTQEConfigurator.tqe3Builder(imageName(), queue, grpc);
+ }
+
+ @Override
+ public TQEClient client(ManagedChannel channel) {
+ return new TQE3Client(channel);
+ }
+ };
+
+ private static final DockerImageName IMAGE_TQE2 =
+ DockerImageName.parse(
+ System.getenv().getOrDefault("TARANTOOL_REGISTRY", "")
+ + "tarantool/message-queue-ee:2.5.3");
+
+ private static final DockerImageName IMAGE_TQE3 =
+ DockerImageName.parse(
+ System.getenv().getOrDefault("TARANTOOL_REGISTRY", "")
+ + "tarantool/message-queue-ee:v3.5.0");
+
+ private static final Path QUEUE_CONFIG_TQE2 =
+ TQETestHelper.loadConfig("tqe2/simple-config/simple-queue.yml");
+ private static final Path GRPC_CONFIG_TQE2 =
+ TQETestHelper.loadConfig("tqe2/simple-config/simple-grpc.yml");
+ private static final Path QUEUE_CONFIG_TQE3 =
+ TQETestHelper.loadConfig("tqe3/simple-config/simple-queue.yml");
+ private static final Path GRPC_CONFIG_TQE3 =
+ TQETestHelper.loadConfig("tqe3/simple-config/simple-grpc.yml");
+
+ private final String displayName;
+ private final String producerRoleName;
+ private final GrpcRole producerRole;
+
+ TQEVersion(String displayName, String producerRoleName, GrpcRole producerRole) {
+ this.displayName = displayName;
+ this.producerRoleName = producerRoleName;
+ this.producerRole = producerRole;
+ }
+
+ public String producerRoleName() {
+ return producerRoleName;
+ }
+
+ public GrpcRole producerRole() {
+ return producerRole;
+ }
+
+ public abstract DockerImageName imageName();
+
+ public abstract Path queueConfig();
+
+ public abstract Path grpcConfig();
+
+ public abstract FileTQEConfigurator.Builder configuratorBuilder(Path queue, Set grpc);
+
+ /**
+ * Creates a {@link TQEClient} bound to the given channel. The channel is owned by the caller; the
+ * client does not shut it down.
+ */
+ public abstract TQEClient client(ManagedChannel channel);
+
+ static Stream all() {
+ return Stream.of(values());
+ }
+
+ @Override
+ public String toString() {
+ return displayName;
+ }
+}
diff --git a/testcontainers/src/test/proto/tqe2/messages/message.proto b/testcontainers/src/test/proto/tqe2/messages/message.proto
new file mode 100644
index 00000000..595f4740
--- /dev/null
+++ b/testcontainers/src/test/proto/tqe2/messages/message.proto
@@ -0,0 +1,53 @@
+syntax = "proto3";
+
+package tarantool.queue_ee;
+
+option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol";
+option java_package = "tarantool.queue_ee.v2";
+option java_outer_classname = "Message";
+
+// Пара ключ-значение
+message Pair {
+ // Ключ пары
+ string key = 1;
+
+ // Значение пары
+ string value = 2;
+}
+
+// Сообщение в очереди
+message QueueMessage {
+ // Идентификатор сообщения
+ // Заполняется автоматически при записи сообщения в очередь
+ uint64 id = 1;
+
+ // Название очереди в которую необходимо опубликовать сообщение
+ string queue = 2;
+
+ // Ключ маршрутизации сообщения (тип сообщения)
+ // необходим для фильтрации сообщений из очереди на консьюмерах
+ optional string routing_key = 3;
+
+ // Ключ шардирования
+ // необходим для распределения данных в системе
+ optional string sharding_key = 4;
+
+ // Ключ дедупликации
+ // необходим для проверки повторных сообщений,
+ // если не указан, то проверка не производится
+ optional string deduplication_key = 5;
+
+ // Произвольные данные в бинарном формате, содержит тело сообщения
+ bytes payload = 6;
+
+ // Произвольные данные в бинарном формате,
+ // содержит дополнительные для сообщения данные,
+ // необходимые для отладки и трассировки
+ map metadata = 7 [deprecated = true];
+
+ // Время вставки сообщения в очередь в наносекундах
+ int64 timestamp = 8;
+
+ // Произвольные данные в формате списка из пар ключ-значения
+ repeated Pair metadata_pairs = 9;
+}
diff --git a/testcontainers/src/test/proto/tqe2/services/consumer.proto b/testcontainers/src/test/proto/tqe2/services/consumer.proto
new file mode 100644
index 00000000..dc24e94f
--- /dev/null
+++ b/testcontainers/src/test/proto/tqe2/services/consumer.proto
@@ -0,0 +1,58 @@
+syntax = "proto3";
+
+package tarantool.queue_ee;
+
+import "messages/message.proto";
+
+option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol";
+option java_package = "tarantool.queue_ee.v2";
+option java_outer_classname = "Consumer";
+
+// Сервер подписок на сообщения брокера очередей
+service ConsumerService {
+ // Подписка на сообщения с фильтром
+ rpc Subscribe(SubscriptionRequest) returns (stream SubscriptionNotifications);
+}
+
+// Запрос на подписку
+message SubscriptionRequest {
+ // Название очереди
+ string queue = 1;
+
+ // Ключ маршрутизации сообщения (тип сообщения)
+ // необходим для фильтрации сообщений из очереди
+ // Если не указан, то подписка происходит на все типы сообщений в очереди
+ optional string routing_key = 2;
+
+ // Опциональная строка указатель на последнее полученное сообщение.
+ // Необходимо для возможности получения истории сообщений
+ // или восстановления работы консьюмера после сбоя
+ // Значение не указано - подписка с текущего момента
+ // Значение пустая строка - подписка с начала очереди
+ // Значение указано - подписка с указанного сообщения в очереди
+ optional string cursor = 3;
+
+ // Ключ шардирования
+ // необходим для распределения данных в системе
+ // Если не указан, то подписка происходит на все типы сообщений в очереди
+ optional string sharding_key = 4;
+
+ // Ключи шардирования позволяют производить фильтрацию по нескольким ключам
+ // шардирования в рамках одной подписки
+ repeated string sharding_keys = 5;
+}
+
+// Сообщение в стриме подписки
+message SubscriptionNotifications {
+ // Новые сообщения в очереди с курсорами
+ repeated SubscriptionNotification notifications = 1;
+}
+
+// Уведомление клиента о новых сообщение в очереди
+message SubscriptionNotification {
+ // Строка-указатель сообщения
+ string cursor = 1;
+
+ // Сообщение
+ QueueMessage message = 2;
+}
diff --git a/testcontainers/src/test/proto/tqe2/services/publisher.proto b/testcontainers/src/test/proto/tqe2/services/publisher.proto
new file mode 100644
index 00000000..949e50f2
--- /dev/null
+++ b/testcontainers/src/test/proto/tqe2/services/publisher.proto
@@ -0,0 +1,263 @@
+syntax = "proto3";
+
+package tarantool.queue_ee;
+
+import "messages/message.proto";
+
+option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol";
+option java_package = "tarantool.queue_ee.v2";
+option java_outer_classname = "Publisher";
+
+// Сервер публикации сообщений брокера очередей
+service PublisherService {
+ // Публикация сообщения в очередь
+ rpc Publish(PublishRequest) returns (PublishResponse);
+ // Публикация сообщений в очередь через двусторонний стрим
+ rpc PublishStream(stream PublishStreamRequest) returns (stream PublishStreamResponse);
+
+ // Публикация набора сообщений в очередь
+ rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse);
+ // Публикация набора сообщений в очередь через двусторонний стрим
+ rpc PublishBatchStream(
+ stream PublishBatchStreamRequest
+ ) returns (
+ stream PublishBatchStreamResponse
+ );
+
+ // Публикация сообщения на указанные шарды очереди
+ rpc Broadcast(BroadcastRequest) returns (BroadcastResponse);
+}
+
+// Режим дедупликации сообщения
+enum Deduplication {
+ DEDUPLICATION_UNSPECIFIED = 0;
+ DEDUPLICATION_BASIC = 1;
+ DEDUPLICATION_EXTENDED = 2;
+ DEDUPLICATION_KEEP_FIRST = 3;
+ DEDUPLICATION_KEEP_LATEST = 4;
+}
+
+// Запрос на публикацию сообщения в очередь
+message PublishRequest {
+ // Название очереди в которой необходимо опубликовать сообщение
+ string queue = 1;
+
+ // Ключ маршрутизации сообщения (тип сообщения)
+ // необходим для фильтрации сообщений из очереди на консьюмерах
+ optional string routing_key = 2;
+
+ // Ключ шардирования
+ // необходим для распределения данных в системе
+ optional string sharding_key = 3;
+
+ // Ключ дедупликации
+ // необходим для проверки повторных сообщений,
+ // если не указан, то проверка не производится
+ optional string deduplication_key = 4;
+
+ // Произвольные данные в бинарном формате, содержит тело сообщения
+ bytes payload = 5;
+
+ // Произвольные данные в бинарном формате,
+ // содержит дополнительные для сообщения данные,
+ // необходимые для отладки и трассировки
+ map metadata = 6 [deprecated = true];
+
+ // Произвольные данные в формате списка из пар ключ-значения
+ repeated Pair metadata_pairs = 7;
+
+ // Режим дедупликации сообщения
+ Deduplication deduplication = 8;
+}
+
+// Запрос на публикация набора сообщений в очередь
+message PublishBatchRequest {
+ // Название очереди в которой необходимо опубликовать сообщения
+ string queue = 1;
+
+ // Ключ шардирования
+ // необходим для распределения данных в системе
+ optional string sharding_key = 2;
+
+ // Набор сообщений
+ repeated BatchRequestMessage messages = 3;
+
+ // Содержит дополнительные данные необходимые для отладки и трассировки
+ map metadata = 4 [deprecated = true];
+
+ // Произвольные данные в формате списка из пар ключ-значения
+ repeated Pair metadata_pairs = 5;
+
+ // Режим дедупликации сообщения
+ Deduplication deduplication = 8;
+}
+
+// Набор сообщений
+message BatchRequestMessage {
+ // Ключ маршрутизации сообщения (тип сообщения)
+ // необходим для фильтрации сообщений из очереди на консьюмерах
+ optional string routing_key = 1;
+
+ // Ключ дедупликации
+ // Необходим для проверки повторных сообщений,
+ // если не указан, то проверка не производится
+ optional string deduplication_key = 2;
+
+ // Произвольные данные в бинарном формате, содержит тело сообщения
+ bytes payload = 3;
+
+ // Произвольные данные в бинарном формате,
+ // содержит дополнительные для сообщения данные,
+ // необходимые для отладки и трассировки
+ map metadata = 4 [deprecated = true];
+
+ // Произвольные данные в формате списка из пар ключ-значения
+ repeated Pair metadata_pairs = 5;
+}
+
+// Ответ на публикацию набора сообщений
+message PublishBatchResponse {
+ // Идентификаторы сообщений
+ repeated uint64 ids = 1;
+ // Содержит дополнительные данные необходимые для отладки и трассировки
+ map metadata = 2 [deprecated = true];
+ // Флаги наличия дубликатов
+ repeated bool is_duplicates = 3;
+}
+
+// Ответ на публикацию сообщения
+message PublishResponse {
+ // Идентификатор сообщения добавленного в очередь
+ // (возможно не нужно)
+ uint64 id = 1;
+ // Содержит дополнительные данные необходимые для отладки и трассировки
+ map metadata = 2 [deprecated = true];
+ // Если true, то был дубликат сообщения
+ bool is_duplicate = 3;
+}
+
+// Зарос на публикацию сообщения через двусторонний стрим
+message PublishStreamRequest {
+ // Идентификатор запроса на публикацию сообщения
+ uint64 request_id = 1;
+
+ // Запрос на публикацию сообщения
+ PublishRequest request = 2;
+}
+
+// Ответ на публикацию сообщения через двусторонний стрим
+message PublishStreamResponse {
+ // Идентификатор запроса на публикацию сообщения
+ uint64 request_id = 1;
+
+ oneof result {
+ // Сообщение об успешной публикации
+ PublishResponse success = 2;
+ // Сообщение об ошибке публикации
+ Error error = 3;
+ }
+}
+
+// Запрос на публикацию набора сообщений через двусторонний стрим
+message PublishBatchStreamRequest {
+ // Идентификатор запроса на публикацию сообщения
+ uint64 request_id = 1;
+
+ // Запрос на публикацию набора сообщений
+ PublishBatchRequest request = 2;
+}
+
+// Ответ на публикацию набора сообщений через двусторонний стрим
+message PublishBatchStreamResponse {
+ // Идентификатор запроса на публикацию сообщения
+ uint64 request_id = 1;
+
+ oneof result {
+ // Сообщение об успешной публикации
+ PublishBatchResponse success = 2;
+ // Сообщение об ошибке публикации
+ Error error = 3;
+ }
+}
+
+// Запрос на рассылку сообщения на указанные шарды
+message BroadcastRequest {
+ // Название очереди, в которую необходимо опубликовать сообщение
+ string queue = 1;
+
+ // Ключ маршрутизации сообщения (тип сообщения)
+ // необходим для фильтрации сообщений из очереди на консьюмерах
+ optional string routing_key = 2;
+
+ // Ключ дедупликации
+ // необходим для проверки повторных сообщений,
+ // если не указан, то проверка не производится
+ optional string deduplication_key = 3;
+
+ // Произвольные данные в бинарном формате, содержит тело сообщения
+ bytes payload = 4;
+
+ // Произвольные данные в бинарном формате,
+ // содержит дополнительные для сообщения данные,
+ // необходимые для отладки и трассировки
+ map metadata = 5 [deprecated = true];
+
+ // Список с названиями репликасетов, на которые нужно опубликовать сообщение.
+ // По умолчанию рассылка происходит на все шарды.
+ repeated string replicasets = 6;
+
+ // Максимальное время на рассылку сообщения
+ optional uint64 timeout = 7;
+
+ // Произвольные данные в формате списка из пар ключ-значения
+ repeated Pair metadata_pairs = 8;
+
+ // Режим дедупликации сообщения
+ Deduplication deduplication = 9;
+}
+
+// Сообщение об успешной публикации
+message Success {
+ // Идентификатор сообщения добавленного в очередь
+ uint64 id = 1;
+ // Флаги наличия дубликатов
+ bool is_duplicate = 2;
+}
+
+// Сообщение об ошибке публикации
+message Error {
+ // Код ошибки
+ uint32 code = 1;
+ // Сообщение об ошибке
+ string message = 2;
+}
+
+// Ответ репликасета на публикацию сообщения
+message ReplicasetResponse {
+ // Сообщение с результатами публикации сообщения
+ oneof result {
+ // Сообщение об успешной публикации
+ Success success = 1;
+
+ // Сообщение об ошибке публикации
+ Error error = 2;
+ }
+}
+
+// Ответ на рассылку сообщения
+message BroadcastResponse {
+ // Код завершения рассылки:
+ // 0 - Успешная публикация
+ // 1 - Ошибка на роутере
+ // 2 - Ошибка на репликасете
+ uint32 code = 1;
+
+ // Сообщение об ошибке
+ optional string error = 2;
+
+ // Набор ответов с шардов
+ map replicasets = 3;
+
+ // Содержит дополнительные данные необходимые для отладки и трассировки
+ map metadata = 4 [deprecated = true];
+}
diff --git a/testcontainers/src/test/proto/messages/cursor.proto b/testcontainers/src/test/proto/tqe3/messages/cursor.proto
similarity index 100%
rename from testcontainers/src/test/proto/messages/cursor.proto
rename to testcontainers/src/test/proto/tqe3/messages/cursor.proto
diff --git a/testcontainers/src/test/proto/messages/message.proto b/testcontainers/src/test/proto/tqe3/messages/message.proto
similarity index 100%
rename from testcontainers/src/test/proto/messages/message.proto
rename to testcontainers/src/test/proto/tqe3/messages/message.proto
diff --git a/testcontainers/src/test/proto/services/consumer.proto b/testcontainers/src/test/proto/tqe3/services/consumer.proto
similarity index 100%
rename from testcontainers/src/test/proto/services/consumer.proto
rename to testcontainers/src/test/proto/tqe3/services/consumer.proto
diff --git a/testcontainers/src/test/proto/services/producer.proto b/testcontainers/src/test/proto/tqe3/services/producer.proto
similarity index 100%
rename from testcontainers/src/test/proto/services/producer.proto
rename to testcontainers/src/test/proto/tqe3/services/producer.proto
diff --git a/testcontainers/src/test/resources/tqe2/simple-config/simple-grpc.yml b/testcontainers/src/test/resources/tqe2/simple-config/simple-grpc.yml
new file mode 100644
index 00000000..d8044bac
--- /dev/null
+++ b/testcontainers/src/test/resources/tqe2/simple-config/simple-grpc.yml
@@ -0,0 +1,21 @@
+core_port: 1111
+grpc_listen:
+ - uri: 'tcp://0.0.0.0:18182'
+
+publisher:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ routers:
+ - "router:3301"
+
+consumer:
+ enabled: true
+ tarantool:
+ user: test-super
+ pass: test
+ connections:
+ storage:
+ - "master:3301"
diff --git a/testcontainers/src/test/resources/tqe2/simple-config/simple-queue.yml b/testcontainers/src/test/resources/tqe2/simple-config/simple-queue.yml
new file mode 100644
index 00000000..63397f99
--- /dev/null
+++ b/testcontainers/src/test/resources/tqe2/simple-config/simple-queue.yml
@@ -0,0 +1,66 @@
+# Credentials
+credentials:
+ users:
+ test-super:
+ password: 'test'
+ roles: [ super ]
+ admin:
+ password: 'secret-cluster-cookie'
+ roles: [ super ]
+ replicator:
+ password: 'secret'
+ roles: [ replication ]
+ storage:
+ roles: [ sharding ]
+ password: storage
+
+# advertise configs for all nodes
+iproto:
+ advertise:
+ peer:
+ login: replicator
+ sharding:
+ login: storage
+ password: storage
+
+roles: [ roles.metrics-export ]
+# queues configs
+roles_cfg:
+ app.roles.queue:
+ queues:
+ - name: test
+ deduplication_mode: keep_latest
+ disabled_filters_by: [ sharding_key ]
+ roles.metrics-export:
+ http:
+ - listen: 8081
+ endpoints:
+ - format: prometheus
+ path: '/metrics'
+
+groups:
+ routers:
+ replicasets:
+ r-1:
+ sharding:
+ roles: [ router ]
+ roles: [ app.roles.api ]
+ instances:
+ router:
+ iproto:
+ listen:
+ - uri: router:3301
+ storages:
+ replicasets:
+ shard-1:
+ replication:
+ failover: manual
+ sharding:
+ roles: [ storage ]
+ roles: [ app.roles.queue ]
+ leader: master
+ instances:
+ master:
+ iproto:
+ listen:
+ - uri: master:3301
diff --git a/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml b/testcontainers/src/test/resources/tqe3/simple-config/simple-grpc.yml
similarity index 84%
rename from testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml
rename to testcontainers/src/test/resources/tqe3/simple-config/simple-grpc.yml
index 2c4c609f..9353580b 100644
--- a/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml
+++ b/testcontainers/src/test/resources/tqe3/simple-config/simple-grpc.yml
@@ -1,6 +1,6 @@
core_port: 1111
grpc_listen:
-- uri: 'tcp://0.0.0.0:18182'
+ - uri: 'tcp://0.0.0.0:18182'
producer:
enabled: true
@@ -13,7 +13,7 @@ producer:
retry_delay: 5s
connections:
routers:
- - "router:3301"
+ - "router:3301"
consumer:
enabled: true
@@ -26,4 +26,4 @@ consumer:
retry_delay: 5s
connections:
storage:
- - "master:3301"
+ - "master:3301"
diff --git a/testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml b/testcontainers/src/test/resources/tqe3/simple-config/simple-queue.yml
similarity index 100%
rename from testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml
rename to testcontainers/src/test/resources/tqe3/simple-config/simple-queue.yml