From 3485578acecac3ad2db85efa8d405eee2375039e Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 4 Apr 2025 19:51:38 +0200
Subject: [PATCH 01/45] Test for remote+s3Cluster
---
tests/integration/test_s3_cluster/test.py | 56 ++++++++++++++++++++++-
1 file changed, 55 insertions(+), 1 deletion(-)
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index e8bf031021e2..67c3120a8820 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -360,7 +360,7 @@ def test_parallel_distributed_insert_select_with_schema_inference(started_cluste
node.query(
"""
CREATE TABLE parallel_insert_select ON CLUSTER 'first_shard' (a String, b UInt64)
- ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
+ ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/parallel_insert_select', '{replica}')
ORDER BY (a, b);
"""
)
@@ -508,3 +508,57 @@ def test_cluster_default_expression(started_cluster):
)
assert result == expected_result
+
+
+def test_remote_hedged(started_cluster):
+ node = started_cluster.instances["s0_0_0"]
+ pure_s3 = node.query(
+ """
+ SELECT * from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ ORDER BY (name, value, polygon)
+ LIMIT 1
+ """
+ )
+ s3_distributed = node.query(
+ """
+ SELECT * from remote('s0_0_1', s3Cluster(
+ 'cluster_simple',
+ 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'))
+ ORDER BY (name, value, polygon)
+ LIMIT 1
+ SETTINGS use_hedged_requests=True
+ """
+ )
+
+ assert TSV(pure_s3) == TSV(s3_distributed)
+
+
+def test_remote_no_hedged(started_cluster):
+ node = started_cluster.instances["s0_0_0"]
+ pure_s3 = node.query(
+ """
+ SELECT * from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ ORDER BY (name, value, polygon)
+ LIMIT 1
+ """
+ )
+ s3_distributed = node.query(
+ """
+ SELECT * from remote('s0_0_1', s3Cluster(
+ 'cluster_simple',
+ 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'))
+ ORDER BY (name, value, polygon)
+ LIMIT 1
+ SETTINGS use_hedged_requests=False
+ """
+ )
+
+ assert TSV(pure_s3) == TSV(s3_distributed)
From 32a8fbb3a3f29c932c465e4ba51b3c9129f83b35 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Tue, 10 Dec 2024 14:19:46 +0100
Subject: [PATCH 02/45] Use INITIAL_QUERY for remote() call
---
src/Interpreters/ClusterProxy/executeQuery.cpp | 1 +
src/Processors/QueryPlan/ReadFromRemote.cpp | 5 ++++-
src/Processors/QueryPlan/ReadFromRemote.h | 2 ++
src/QueryPipeline/RemoteQueryExecutor.cpp | 6 +++++-
src/QueryPipeline/RemoteQueryExecutor.h | 4 ++++
5 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp
index 1a5af59e8ca2..1753592b5d8d 100644
--- a/src/Interpreters/ClusterProxy/executeQuery.cpp
+++ b/src/Interpreters/ClusterProxy/executeQuery.cpp
@@ -444,6 +444,7 @@ void executeQuery(
not_optimized_cluster->getName());
read_from_remote->setStepDescription("Read from remote replica");
+ read_from_remote->setRemoteFunction(is_remote_function);
plan->addStep(std::move(read_from_remote));
plan->addInterpreterContext(new_context);
plans.emplace_back(std::move(plan));
diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp
index 4717e954bdc0..cf167a0a3d5e 100644
--- a/src/Processors/QueryPlan/ReadFromRemote.cpp
+++ b/src/Processors/QueryPlan/ReadFromRemote.cpp
@@ -456,7 +456,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
my_stage = stage, my_storage = storage,
add_agg_info, add_totals, add_extremes, async_read, async_query_sending,
query_tree = shard.query_tree, planner_context = shard.planner_context,
- pushed_down_filters]() mutable
+ pushed_down_filters, my_is_remote_function = is_remote_function]() mutable
-> QueryPipelineBuilder
{
auto current_settings = my_context->getSettingsRef();
@@ -540,6 +540,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared(), "_shard_num"}};
auto remote_query_executor = std::make_shared(
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage);
+ remote_query_executor->setRemoteFunction(my_is_remote_function);
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending);
QueryPipelineBuilder builder;
@@ -624,6 +625,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
priority_func);
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
+ remote_query_executor->setRemoteFunction(is_remote_function);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
@@ -643,6 +645,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
auto remote_query_executor = std::make_shared(
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
+ remote_query_executor->setRemoteFunction(is_remote_function);
if (context->canUseTaskBasedParallelReplicas())
{
diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h
index e100d99900ec..ee49b508e1fc 100644
--- a/src/Processors/QueryPlan/ReadFromRemote.h
+++ b/src/Processors/QueryPlan/ReadFromRemote.h
@@ -46,6 +46,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
void enableMemoryBoundMerging();
void enforceAggregationInOrder();
+ void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
private:
ClusterProxy::SelectStreamFactory::Shards shards;
@@ -61,6 +62,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
UInt32 shard_count;
const String cluster_name;
std::optional priority_func_factory;
+ bool is_remote_function = false;
Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const Header & out_header);
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard, const Header & out_header);
diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp
index 2d805f663911..d3ca6ede9fa9 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.cpp
+++ b/src/QueryPipeline/RemoteQueryExecutor.cpp
@@ -404,7 +404,11 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context->getClientInfo();
- modified_client_info.query_kind = query_kind;
+
+ if (is_remote_function)
+ modified_client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
+ else
+ modified_client_info.query_kind = query_kind;
if (!duplicated_part_uuids.empty())
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h
index 2077990da946..a9766d1cce59 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.h
+++ b/src/QueryPipeline/RemoteQueryExecutor.h
@@ -212,6 +212,8 @@ class RemoteQueryExecutor
void setLogger(LoggerPtr logger) { log = logger; }
+ void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
+
const Block & getHeader() const { return header; }
IConnections & getConnections() { return *connections; }
@@ -301,6 +303,8 @@ class RemoteQueryExecutor
bool packet_in_progress = false;
+ bool is_remote_function = false;
+
/// Parts uuids, collected from remote replicas
std::vector duplicated_part_uuids;
From ca122a521562a5ff99089be41b6d91e626f21d12 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Tue, 10 Dec 2024 19:53:20 +0100
Subject: [PATCH 03/45] Avoid CLIENT_INFO_DOES_NOT_MATCH error
---
src/QueryPipeline/RemoteQueryExecutor.cpp | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp
index d3ca6ede9fa9..ec5cc334d4c8 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.cpp
+++ b/src/QueryPipeline/RemoteQueryExecutor.cpp
@@ -406,7 +406,10 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
ClientInfo modified_client_info = context->getClientInfo();
if (is_remote_function)
- modified_client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
+ {
+ modified_client_info.setInitialQuery();
+ modified_client_info.client_name = "ClickHouse server";
+ }
else
modified_client_info.query_kind = query_kind;
From 18a5350ec5abcc50412f6efd0e647915e6bb5dac Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Tue, 10 Dec 2024 22:19:01 +0100
Subject: [PATCH 04/45] Workaroung for remote with multiple shards
---
src/Processors/QueryPlan/ReadFromRemote.cpp | 3 +++
src/QueryPipeline/RemoteQueryExecutor.cpp | 3 ++-
src/QueryPipeline/RemoteQueryExecutor.h | 3 +++
3 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp
index cf167a0a3d5e..22fc1ca17dee 100644
--- a/src/Processors/QueryPlan/ReadFromRemote.cpp
+++ b/src/Processors/QueryPlan/ReadFromRemote.cpp
@@ -541,6 +541,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
auto remote_query_executor = std::make_shared(
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, my_stage);
remote_query_executor->setRemoteFunction(my_is_remote_function);
+ remote_query_executor->setShardCount(my_shard_count);
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending);
QueryPipelineBuilder builder;
@@ -626,6 +627,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
remote_query_executor->setRemoteFunction(is_remote_function);
+ remote_query_executor->setShardCount(shard_count);
if (!table_func_ptr)
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
@@ -646,6 +648,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
remote_query_executor->setRemoteFunction(is_remote_function);
+ remote_query_executor->setShardCount(shard_count);
if (context->canUseTaskBasedParallelReplicas())
{
diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp
index ec5cc334d4c8..0a410b5ff5cf 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.cpp
+++ b/src/QueryPipeline/RemoteQueryExecutor.cpp
@@ -405,7 +405,8 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context->getClientInfo();
- if (is_remote_function)
+ /// Doesn't support now "remote('1.1.1.{1,2}')""
+ if (is_remote_function && (shard_count == 1))
{
modified_client_info.setInitialQuery();
modified_client_info.client_name = "ClickHouse server";
diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h
index a9766d1cce59..32d0cca3480d 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.h
+++ b/src/QueryPipeline/RemoteQueryExecutor.h
@@ -214,6 +214,8 @@ class RemoteQueryExecutor
void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
+ void setShardCount(UInt32 shard_count_) { shard_count = shard_count_; }
+
const Block & getHeader() const { return header; }
IConnections & getConnections() { return *connections; }
@@ -304,6 +306,7 @@ class RemoteQueryExecutor
bool packet_in_progress = false;
bool is_remote_function = false;
+ UInt32 shard_count = 0;
/// Parts uuids, collected from remote replicas
std::vector duplicated_part_uuids;
From 8ac5a7387307e1715bece4f2fe121ee501fe2fca Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Tue, 10 Dec 2024 22:37:23 +0100
Subject: [PATCH 05/45] Workaround to CLIENT_INFO_DOES_NOT_MATCH with 'TCP not
equal to HTTP'
---
src/QueryPipeline/RemoteQueryExecutor.cpp | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp
index 0a410b5ff5cf..37e60f3f5115 100644
--- a/src/QueryPipeline/RemoteQueryExecutor.cpp
+++ b/src/QueryPipeline/RemoteQueryExecutor.cpp
@@ -410,6 +410,7 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
{
modified_client_info.setInitialQuery();
modified_client_info.client_name = "ClickHouse server";
+ modified_client_info.interface = ClientInfo::Interface::TCP;
}
else
modified_client_info.query_kind = query_kind;
From 63759393729d8dcf86ca5001b8d07e9ac660db8c Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Wed, 11 Dec 2024 00:16:42 +0100
Subject: [PATCH 06/45] Keep initial_query_id for remote with INITIAL_QUERY
---
src/Interpreters/Context.cpp | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index 21db654fc005..fcdf29be80dd 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -2825,8 +2825,11 @@ void Context::setCurrentQueryId(const String & query_id)
client_info.current_query_id = query_id_to_set;
- if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
+ if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
+ && client_info.initial_query_id.empty())
+ {
client_info.initial_query_id = client_info.current_query_id;
+ }
}
void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)
From 8f526b8237ad280cc61fd6ba368eb5738ef22b8a Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 3 Jan 2025 18:38:12 +0100
Subject: [PATCH 07/45] Left QueryID for server only
---
src/Interpreters/Context.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index fcdf29be80dd..db5731edf574 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -2826,7 +2826,7 @@ void Context::setCurrentQueryId(const String & query_id)
client_info.current_query_id = query_id_to_set;
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
- && client_info.initial_query_id.empty())
+ && (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
{
client_info.initial_query_id = client_info.current_query_id;
}
From 49571830ed207bb0816b36aa054ec5d79b1dd0f9 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Tue, 14 Jan 2025 12:48:16 +0100
Subject: [PATCH 08/45] Fixes after few comments
---
src/Interpreters/ClusterProxy/executeQuery.cpp | 2 +-
src/Processors/QueryPlan/ReadFromRemote.h | 2 +-
tests/integration/test_storage_iceberg/test.py | 8 ++++++++
3 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp
index 1753592b5d8d..03e702f5a64a 100644
--- a/src/Interpreters/ClusterProxy/executeQuery.cpp
+++ b/src/Interpreters/ClusterProxy/executeQuery.cpp
@@ -444,7 +444,7 @@ void executeQuery(
not_optimized_cluster->getName());
read_from_remote->setStepDescription("Read from remote replica");
- read_from_remote->setRemoteFunction(is_remote_function);
+ read_from_remote->setIsRemoteFunction(is_remote_function);
plan->addStep(std::move(read_from_remote));
plan->addInterpreterContext(new_context);
plans.emplace_back(std::move(plan));
diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h
index ee49b508e1fc..84b2d28dd6a0 100644
--- a/src/Processors/QueryPlan/ReadFromRemote.h
+++ b/src/Processors/QueryPlan/ReadFromRemote.h
@@ -46,7 +46,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
void enableMemoryBoundMerging();
void enforceAggregationInOrder();
- void setRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
+ void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
private:
ClusterProxy::SelectStreamFactory::Shards shards;
diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py
index 440f83adb38d..007c3f1945f8 100644
--- a/tests/integration/test_storage_iceberg/test.py
+++ b/tests/integration/test_storage_iceberg/test.py
@@ -635,6 +635,14 @@ def add_df(mode):
)
assert len(cluster_secondary_queries) == 1
+ select_remote_cluster = (
+ instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})")
+ .strip()
+ .split()
+ )
+ assert len(select_remote_cluster) == 600
+ assert select_remote_cluster == select_regular
+
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
From c640138d5af4729cba90c9168414133a8d8ebeac Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Tue, 24 Dec 2024 16:16:03 +0100
Subject: [PATCH 09/45] s3Cluster hive optimization
---
src/Planner/Planner.cpp | 22 ++++
src/Processors/QueryPlan/ObjectFilterStep.cpp | 58 +++++++++
src/Processors/QueryPlan/ObjectFilterStep.h | 35 ++++++
.../optimizePrimaryKeyConditionAndLimit.cpp | 5 +
.../QueryPlan/QueryPlanStepRegistry.cpp | 2 +
.../StorageObjectStorageCluster.cpp | 2 +-
.../StorageObjectStorageCluster.h | 1 -
tests/integration/test_s3_cluster/test.py | 113 +++++++++++++++++-
8 files changed, 235 insertions(+), 3 deletions(-)
create mode 100644 src/Processors/QueryPlan/ObjectFilterStep.cpp
create mode 100644 src/Processors/QueryPlan/ObjectFilterStep.h
diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp
index d24aebafa181..4da15aeeab77 100644
--- a/src/Planner/Planner.cpp
+++ b/src/Planner/Planner.cpp
@@ -37,6 +37,7 @@
#include
#include
#include
+#include
#include
#include
@@ -133,6 +134,7 @@ namespace Setting
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool optimize_group_by_constant_keys;
+ extern const SettingsBool use_hive_partitioning;
}
namespace ServerSetting
@@ -414,6 +416,19 @@ void addFilterStep(QueryPlan & query_plan,
query_plan.addStep(std::move(where_step));
}
+void addObjectFilterStep(QueryPlan & query_plan,
+ FilterAnalysisResult & filter_analysis_result,
+ const std::string & step_description)
+{
+ auto actions = std::move(filter_analysis_result.filter_actions->dag);
+
+ auto where_step = std::make_unique(query_plan.getCurrentHeader(),
+ std::move(actions),
+ filter_analysis_result.filter_column_name);
+ where_step->setStepDescription(step_description);
+ query_plan.addStep(std::move(where_step));
+}
+
Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
const AggregationAnalysisResult & aggregation_analysis_result,
const QueryAnalysisResult & query_analysis_result,
@@ -1680,6 +1695,13 @@ void Planner::buildPlanForQueryNode()
if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
{
+ if (settings[Setting::use_hive_partitioning]
+ && !query_processing_info.isFirstStage()
+ && expression_analysis_result.hasWhere())
+ {
+ addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
+ }
+
if (query_processing_info.isFromAggregationState())
{
/// Aggregation was performed on remote shards
diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp
new file mode 100644
index 000000000000..7c03a3699e31
--- /dev/null
+++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp
@@ -0,0 +1,58 @@
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+namespace DB
+{
+
+ObjectFilterStep::ObjectFilterStep(
+ const Header & input_header_,
+ ActionsDAG actions_dag_,
+ String filter_column_name_)
+ : actions_dag(std::move(actions_dag_))
+ , filter_column_name(std::move(filter_column_name_))
+{
+ input_headers.emplace_back(std::move(input_header_));
+ output_header = input_headers.front();
+}
+
+QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */)
+{
+ return std::move(pipelines.front());
+}
+
+void ObjectFilterStep::updateOutputHeader()
+{
+ output_header = input_headers.front();
+}
+
+void ObjectFilterStep::serialize(Serialization & ctx) const
+{
+ writeStringBinary(filter_column_name, ctx.out);
+
+ actions_dag.serialize(ctx.out, ctx.registry);
+}
+
+std::unique_ptr ObjectFilterStep::deserialize(Deserialization & ctx)
+{
+ if (ctx.input_headers.size() != 1)
+ throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream");
+
+ String filter_column_name;
+ readStringBinary(filter_column_name, ctx.in);
+
+ ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
+
+ return std::make_unique(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name));
+}
+
+void registerObjectFilterStep(QueryPlanStepRegistry & registry)
+{
+ registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize);
+}
+
+}
diff --git a/src/Processors/QueryPlan/ObjectFilterStep.h b/src/Processors/QueryPlan/ObjectFilterStep.h
new file mode 100644
index 000000000000..f72cb00c86ab
--- /dev/null
+++ b/src/Processors/QueryPlan/ObjectFilterStep.h
@@ -0,0 +1,35 @@
+#pragma once
+#include
+#include
+
+namespace DB
+{
+
+/// Implements WHERE operation.
+class ObjectFilterStep : public IQueryPlanStep
+{
+public:
+ ObjectFilterStep(
+ const Header & input_header_,
+ ActionsDAG actions_dag_,
+ String filter_column_name_);
+
+ String getName() const override { return "ObjectFilter"; }
+ QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override;
+
+ const ActionsDAG & getExpression() const { return actions_dag; }
+ ActionsDAG & getExpression() { return actions_dag; }
+ const String & getFilterColumnName() const { return filter_column_name; }
+
+ void serialize(Serialization & ctx) const override;
+
+ static std::unique_ptr deserialize(Deserialization & ctx);
+
+private:
+ void updateOutputHeader() override;
+
+ ActionsDAG actions_dag;
+ String filter_column_name;
+};
+
+}
diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp
index ce36c7bddb43..33408e02df87 100644
--- a/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp
+++ b/src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp
@@ -3,6 +3,7 @@
#include
#include
#include
+#include
namespace DB::QueryPlanOptimizations
{
@@ -41,6 +42,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack)
/// So this is likely not needed.
continue;
}
+ else if (auto * object_filter_step = typeid_cast(iter->node->step.get()))
+ {
+ source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName());
+ }
else
{
break;
diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp
index 0df21ff9d057..c378594ef9ce 100644
--- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp
+++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp
@@ -48,6 +48,7 @@ void registerOffsetStep(QueryPlanStepRegistry & registry);
void registerFilterStep(QueryPlanStepRegistry & registry);
void registerTotalsHavingStep(QueryPlanStepRegistry & registry);
void registerExtremesStep(QueryPlanStepRegistry & registry);
+void registerObjectFilterStep(QueryPlanStepRegistry & registry);
void QueryPlanStepRegistry::registerPlanSteps()
{
@@ -65,6 +66,7 @@ void QueryPlanStepRegistry::registerPlanSteps()
registerFilterStep(registry);
registerTotalsHavingStep(registry);
registerExtremesStep(registry);
+ registerObjectFilterStep(registry);
}
}
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index 07eecc655998..514b5448fe49 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -116,7 +116,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
{
auto iterator = StorageObjectStorageSource::createFileIterator(
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
- local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback());
+ local_context, predicate, getVirtualsList(), nullptr, local_context->getFileProgressCallback());
auto callback = std::make_shared>([iterator]() mutable -> String
{
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
index 0088ff28fc22..ccecf2b2ae4e 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
@@ -38,7 +38,6 @@ class StorageObjectStorageCluster : public IStorageCluster
const String engine_name;
const StorageObjectStorage::ConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
- NamesAndTypesList virtual_columns;
};
}
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index 67c3120a8820..7ade46b32c1d 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -2,7 +2,7 @@
import logging
import os
import shutil
-import time
+import uuid
from email.errors import HeaderParseError
import pytest
@@ -562,3 +562,114 @@ def test_remote_no_hedged(started_cluster):
)
assert TSV(pure_s3) == TSV(s3_distributed)
+
+
+def test_hive_partitioning(started_cluster):
+ node = started_cluster.instances["s0_0_0"]
+ for i in range(1,5):
+ node.query(
+ f"""
+ INSERT
+ INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
+ VALUES ({i}, {i})
+ """
+ )
+
+ query_id_full = str(uuid.uuid4())
+ result = node.query(
+ """
+ SELECT count()
+ FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
+ WHERE key <= 2
+ FORMAT TSV
+ SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0
+ """,
+ query_id=query_id_full,
+ )
+ result = int(result)
+ assert result == 2
+
+ query_id_optimized = str(uuid.uuid4())
+ result = node.query(
+ """
+ SELECT count()
+ FROM s3('http://minio1:9001/root/data/hive/key=**.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
+ WHERE key <= 2
+ FORMAT TSV
+ SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1
+ """,
+ query_id=query_id_optimized,
+ )
+ result = int(result)
+ assert result == 2
+
+ query_id_cluster_full = str(uuid.uuid4())
+ result = node.query(
+ """
+ SELECT count()
+ FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
+ WHERE key <= 2
+ FORMAT TSV
+ SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 0
+ """,
+ query_id=query_id_cluster_full,
+ )
+ result = int(result)
+ assert result == 2
+
+ query_id_cluster_optimized = str(uuid.uuid4())
+ result = node.query(
+ """
+ SELECT count()
+ FROM s3Cluster(cluster_simple, 'http://minio1:9001/root/data/hive/key=**.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
+ WHERE key <= 2
+ FORMAT TSV
+ SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1
+ """,
+ query_id=query_id_cluster_optimized,
+ )
+ result = int(result)
+ assert result == 2
+
+ node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
+
+ full_traffic = node.query(
+ f"""
+ SELECT sum(ProfileEvents['ReadBufferFromS3Bytes'])
+ FROM clusterAllReplicas(cluster_simple, system.query_log)
+ WHERE type='QueryFinish' AND initial_query_id='{query_id_full}'
+ FORMAT TSV
+ """)
+ full_traffic = int(full_traffic)
+ assert full_traffic > 0 # 612*4
+
+ optimized_traffic = node.query(
+ f"""
+ SELECT sum(ProfileEvents['ReadBufferFromS3Bytes'])
+ FROM clusterAllReplicas(cluster_simple, system.query_log)
+ WHERE type='QueryFinish' AND initial_query_id='{query_id_optimized}'
+ FORMAT TSV
+ """)
+ optimized_traffic = int(optimized_traffic)
+ assert optimized_traffic > 0 # 612*2
+ assert full_traffic > optimized_traffic
+
+ cluster_full_traffic = node.query(
+ f"""
+ SELECT sum(ProfileEvents['ReadBufferFromS3Bytes'])
+ FROM clusterAllReplicas(cluster_simple, system.query_log)
+ WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_full}'
+ FORMAT TSV
+ """)
+ cluster_full_traffic = int(cluster_full_traffic)
+ assert cluster_full_traffic == full_traffic
+
+ cluster_optimized_traffic = node.query(
+ f"""
+ SELECT sum(ProfileEvents['ReadBufferFromS3Bytes'])
+ FROM clusterAllReplicas(cluster_simple, system.query_log)
+ WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_optimized}'
+ FORMAT TSV
+ """)
+ cluster_optimized_traffic = int(cluster_optimized_traffic)
+ assert cluster_optimized_traffic == optimized_traffic
From c23eeb02d8891af83c3593e9ba25d7ea4de33269 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Mon, 30 Dec 2024 13:06:51 +0100
Subject: [PATCH 10/45] Style fix
---
src/Processors/QueryPlan/ObjectFilterStep.cpp | 5 +++++
tests/integration/test_s3_cluster/test.py | 14 +++++++++-----
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp
index 7c03a3699e31..c38fac78e502 100644
--- a/src/Processors/QueryPlan/ObjectFilterStep.cpp
+++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp
@@ -9,6 +9,11 @@
namespace DB
{
+namespace ErrorCodes
+{
+ extern const int INCORRECT_DATA;
+}
+
ObjectFilterStep::ObjectFilterStep(
const Header & input_header_,
ActionsDAG actions_dag_,
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index 7ade46b32c1d..bd875125d14b 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -566,7 +566,7 @@ def test_remote_no_hedged(started_cluster):
def test_hive_partitioning(started_cluster):
node = started_cluster.instances["s0_0_0"]
- for i in range(1,5):
+ for i in range(1, 5):
node.query(
f"""
INSERT
@@ -639,7 +639,8 @@ def test_hive_partitioning(started_cluster):
FROM clusterAllReplicas(cluster_simple, system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_full}'
FORMAT TSV
- """)
+ """
+ )
full_traffic = int(full_traffic)
assert full_traffic > 0 # 612*4
@@ -649,7 +650,8 @@ def test_hive_partitioning(started_cluster):
FROM clusterAllReplicas(cluster_simple, system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_optimized}'
FORMAT TSV
- """)
+ """
+ )
optimized_traffic = int(optimized_traffic)
assert optimized_traffic > 0 # 612*2
assert full_traffic > optimized_traffic
@@ -660,7 +662,8 @@ def test_hive_partitioning(started_cluster):
FROM clusterAllReplicas(cluster_simple, system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_full}'
FORMAT TSV
- """)
+ """
+ )
cluster_full_traffic = int(cluster_full_traffic)
assert cluster_full_traffic == full_traffic
@@ -670,6 +673,7 @@ def test_hive_partitioning(started_cluster):
FROM clusterAllReplicas(cluster_simple, system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id_cluster_optimized}'
FORMAT TSV
- """)
+ """
+ )
cluster_optimized_traffic = int(cluster_optimized_traffic)
assert cluster_optimized_traffic == optimized_traffic
From 96deb627fad0705f521d2861684eaa3e7dc42c26 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Mon, 30 Dec 2024 15:55:00 +0100
Subject: [PATCH 11/45] Fix tidy build
---
src/Processors/QueryPlan/ObjectFilterStep.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/Processors/QueryPlan/ObjectFilterStep.cpp b/src/Processors/QueryPlan/ObjectFilterStep.cpp
index c38fac78e502..2ae2294a571b 100644
--- a/src/Processors/QueryPlan/ObjectFilterStep.cpp
+++ b/src/Processors/QueryPlan/ObjectFilterStep.cpp
@@ -21,7 +21,7 @@ ObjectFilterStep::ObjectFilterStep(
: actions_dag(std::move(actions_dag_))
, filter_column_name(std::move(filter_column_name_))
{
- input_headers.emplace_back(std::move(input_header_));
+ input_headers.emplace_back(input_header_);
output_header = input_headers.front();
}
From 82bcc635bb4bcf15e33c27e5ef2768f91111abb5 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Mon, 30 Dec 2024 20:12:10 +0100
Subject: [PATCH 12/45] Fix test
---
tests/integration/test_s3_cluster/test.py | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index bd875125d14b..f82397d4be64 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -567,13 +567,23 @@ def test_remote_no_hedged(started_cluster):
def test_hive_partitioning(started_cluster):
node = started_cluster.instances["s0_0_0"]
for i in range(1, 5):
- node.query(
+ exists = node.query(
f"""
- INSERT
- INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
- VALUES ({i}, {i})
+ SELECT
+ count()
+ FROM s3('http://minio1:9001/root/data/hive/key={i}/*', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
+ GROUP BY ALL
+ FORMAT TSV
"""
)
+ if int(exists) == 0:
+ node.query(
+ f"""
+ INSERT
+ INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
+ VALUES ({i}, {i})
+ """
+ )
query_id_full = str(uuid.uuid4())
result = node.query(
From 096c406b8042a7a0ed1da3926b5c10dbdb0cd439 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Thu, 2 Jan 2025 12:06:56 +0100
Subject: [PATCH 13/45] Do not use ObjectFilter when not required
---
src/Planner/Planner.cpp | 5 +++-
src/Storages/IStorageCluster.cpp | 46 -------------------------------
src/Storages/IStorageCluster.h | 47 ++++++++++++++++++++++++++++++++
3 files changed, 51 insertions(+), 47 deletions(-)
diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp
index 4da15aeeab77..51bb86d34a94 100644
--- a/src/Planner/Planner.cpp
+++ b/src/Planner/Planner.cpp
@@ -1699,7 +1699,10 @@ void Planner::buildPlanForQueryNode()
&& !query_processing_info.isFirstStage()
&& expression_analysis_result.hasWhere())
{
- addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
+ if (typeid_cast(query_plan.getRootNode()->step.get()))
+ {
+ addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
+ }
}
if (query_processing_info.isFromAggregationState())
diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp
index 219092e7ab5a..28b5a84166a2 100644
--- a/src/Storages/IStorageCluster.cpp
+++ b/src/Storages/IStorageCluster.cpp
@@ -15,7 +15,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -47,51 +46,6 @@ IStorageCluster::IStorageCluster(
{
}
-class ReadFromCluster : public SourceStepWithFilter
-{
-public:
- std::string getName() const override { return "ReadFromCluster"; }
- void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
- void applyFilters(ActionDAGNodes added_filter_nodes) override;
-
- ReadFromCluster(
- const Names & column_names_,
- const SelectQueryInfo & query_info_,
- const StorageSnapshotPtr & storage_snapshot_,
- const ContextPtr & context_,
- Block sample_block,
- std::shared_ptr storage_,
- ASTPtr query_to_send_,
- QueryProcessingStage::Enum processed_stage_,
- ClusterPtr cluster_,
- LoggerPtr log_)
- : SourceStepWithFilter(
- std::move(sample_block),
- column_names_,
- query_info_,
- storage_snapshot_,
- context_)
- , storage(std::move(storage_))
- , query_to_send(std::move(query_to_send_))
- , processed_stage(processed_stage_)
- , cluster(std::move(cluster_))
- , log(log_)
- {
- }
-
-private:
- std::shared_ptr storage;
- ASTPtr query_to_send;
- QueryProcessingStage::Enum processed_stage;
- ClusterPtr cluster;
- LoggerPtr log;
-
- std::optional extension;
-
- void createExtension(const ActionsDAG::Node * predicate);
- ContextPtr updateSettings(const Settings & settings);
-};
-
void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
{
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h
index d000e24562ff..4d7a047e0c3e 100644
--- a/src/Storages/IStorageCluster.h
+++ b/src/Storages/IStorageCluster.h
@@ -4,6 +4,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -52,4 +53,50 @@ class IStorageCluster : public IStorage
};
+class ReadFromCluster : public SourceStepWithFilter
+{
+public:
+ std::string getName() const override { return "ReadFromCluster"; }
+ void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
+ void applyFilters(ActionDAGNodes added_filter_nodes) override;
+
+ ReadFromCluster(
+ const Names & column_names_,
+ const SelectQueryInfo & query_info_,
+ const StorageSnapshotPtr & storage_snapshot_,
+ const ContextPtr & context_,
+ Block sample_block,
+ std::shared_ptr storage_,
+ ASTPtr query_to_send_,
+ QueryProcessingStage::Enum processed_stage_,
+ ClusterPtr cluster_,
+ LoggerPtr log_)
+ : SourceStepWithFilter(
+ std::move(sample_block),
+ column_names_,
+ query_info_,
+ storage_snapshot_,
+ context_)
+ , storage(std::move(storage_))
+ , query_to_send(std::move(query_to_send_))
+ , processed_stage(processed_stage_)
+ , cluster(std::move(cluster_))
+ , log(log_)
+ {
+ }
+
+private:
+ std::shared_ptr storage;
+ ASTPtr query_to_send;
+ QueryProcessingStage::Enum processed_stage;
+ ClusterPtr cluster;
+ LoggerPtr log;
+
+ std::optional extension;
+
+ void createExtension(const ActionsDAG::Node * predicate);
+ ContextPtr updateSettings(const Settings & settings);
+};
+
+
}
From 1f0d9f6d6a35eaca0f59bfb2441ac7dec5010637 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Thu, 2 Jan 2025 14:00:06 +0100
Subject: [PATCH 14/45] Fix test
---
tests/integration/test_s3_cluster/test.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index f82397d4be64..03788dd4c128 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -581,7 +581,8 @@ def test_hive_partitioning(started_cluster):
f"""
INSERT
INTO FUNCTION s3('http://minio1:9001/root/data/hive/key={i}/data.parquet', 'minio', 'minio123', 'Parquet', 'key Int32, value Int32')
- VALUES ({i}, {i})
+ SELECT {i}, {i}
+ SETTINGS use_hive_partitioning = 0
"""
)
From 1117652a8191c22d450131d744037846b0b73064 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Wed, 15 Jan 2025 16:17:05 +0100
Subject: [PATCH 15/45] Alternative syntax for object storage cluster functions
---
src/Core/Settings.cpp | 9 +
src/Core/SettingsChangesHistory.cpp | 5 +
.../StorageObjectStorageCluster.cpp | 16 +-
.../StorageObjectStorageCluster.h | 3 +
.../TableFunctionObjectStorage.cpp | 32 --
.../TableFunctionObjectStorageCluster.cpp | 1 +
.../TableFunctionObjectStorageCluster.h | 2 -
...leFunctionObjectStorageClusterFallback.cpp | 325 ++++++++++++++++++
...ableFunctionObjectStorageClusterFallback.h | 49 +++
src/TableFunctions/registerTableFunctions.cpp | 1 +
src/TableFunctions/registerTableFunctions.h | 1 +
tests/integration/test_s3_cluster/test.py | 157 +++++++++
.../test_storage_azure_blob_storage/test.py | 6 +-
.../test_cluster.py | 57 ++-
.../integration/test_storage_iceberg/test.py | 51 ++-
15 files changed, 666 insertions(+), 49 deletions(-)
create mode 100644 src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
create mode 100644 src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp
index 8551fa6b60bd..d674fed8e47e 100644
--- a/src/Core/Settings.cpp
+++ b/src/Core/Settings.cpp
@@ -6089,6 +6089,15 @@ Trigger processor to spill data into external storage adpatively. grace join is
/** Experimental tsToGrid aggregate function. */ \
DECLARE(Bool, allow_experimental_ts_to_grid_aggregate_function, false, R"(
Experimental tsToGrid aggregate function for Prometheus-like timeseries resampling. Cloud only
+)", EXPERIMENTAL) \
+ DECLARE(String, object_storage_cluster_function_cluster, "", R"(
+Cluster to make distributed requests to object storages with alternative syntax.
+)", EXPERIMENTAL) \
+ DECLARE(UInt64, object_storage_cluster_function_max_hosts, 0, R"(
+Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
+Possible values:
+- Positive integer.
+- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
\
/* ####################################################### */ \
diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp
index dc6809ffa283..4346360cc0fb 100644
--- a/src/Core/SettingsChangesHistory.cpp
+++ b/src/Core/SettingsChangesHistory.cpp
@@ -149,6 +149,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// Release closed. Please use 25.1
});
addSettingsChanges(settings_changes_history, "24.11",
+ {"object_storage_cluster_function_cluster", "", "", "New setting"},
+ {"object_storage_cluster_function_max_hosts", 0, 0, "New setting"},
+ }
+ },
+ {"24.11",
{
{"validate_mutation_query", false, true, "New setting to validate mutation queries by default."},
{"enable_job_stack_trace", false, true, "Enable by default collecting stack traces from job's scheduling."},
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index 514b5448fe49..54478dab1de8 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -59,6 +59,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
, configuration{configuration_}
, object_storage(object_storage_)
+ , cluster_name_in_settings(false)
{
ColumnsDescription columns{columns_};
std::string sample_path;
@@ -105,10 +106,17 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
configuration->getEngineName());
}
- ASTPtr cluster_name_arg = args.front();
- args.erase(args.begin());
- configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
- args.insert(args.begin(), cluster_name_arg);
+ if (cluster_name_in_settings)
+ {
+ configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
+ }
+ else
+ {
+ ASTPtr cluster_name_arg = args.front();
+ args.erase(args.begin());
+ configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
+ args.insert(args.begin(), cluster_name_arg);
+ }
}
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
index ccecf2b2ae4e..32a942d4a857 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
@@ -29,6 +29,8 @@ class StorageObjectStorageCluster : public IStorageCluster
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
+ void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; }
+
private:
void updateQueryToSendIfNeeded(
ASTPtr & query,
@@ -38,6 +40,7 @@ class StorageObjectStorageCluster : public IStorageCluster
const String engine_name;
const StorageObjectStorage::ConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
+ bool cluster_name_in_settings;
};
}
diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp
index ba1a05732f78..7849094a71d7 100644
--- a/src/TableFunctions/TableFunctionObjectStorage.cpp
+++ b/src/TableFunctions/TableFunctionObjectStorage.cpp
@@ -183,38 +183,6 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
.allow_readonly = false
});
#endif
-
-#if USE_AZURE_BLOB_STORAGE
- factory.registerFunction>(
- {
- .documentation =
- {
- .description=R"(The table function can be used to read the data stored on Azure Blob Storage.)",
- .examples{
- {
- "azureBlobStorage",
- "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
- "[account_name, account_key, format, compression, structure])", ""
- }}
- },
- .allow_readonly = false
- });
-#endif
-#if USE_HDFS
- factory.registerFunction>(
- {
- .documentation =
- {
- .description=R"(The table function can be used to read the data stored on HDFS virtual filesystem.)",
- .examples{
- {
- "hdfs",
- "SELECT * FROM hdfs(url, format, compression, structure])", ""
- }}
- },
- .allow_readonly = false
- });
-#endif
}
#if USE_AZURE_BLOB_STORAGE
diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp
index e8e7009cff66..3d086b183060 100644
--- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp
+++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp
@@ -22,6 +22,7 @@ StoragePtr TableFunctionObjectStorageCluster::execute
auto configuration = Base::getConfiguration();
ColumnsDescription columns;
+
if (configuration->structure != "auto")
columns = parseColumnsListFromString(configuration->structure, context);
else if (!Base::structure_hint.empty())
diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h
index 54afae400288..d30ea6aa1e03 100644
--- a/src/TableFunctions/TableFunctionObjectStorageCluster.h
+++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h
@@ -10,8 +10,6 @@ namespace DB
class Context;
-class StorageS3Settings;
-class StorageAzureBlobSettings;
class StorageS3Configuration;
class StorageAzureConfiguration;
diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
new file mode 100644
index 000000000000..94408ecb80fb
--- /dev/null
+++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
@@ -0,0 +1,325 @@
+#include
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+namespace Setting
+{
+ extern const SettingsString object_storage_cluster_function_cluster;
+}
+
+namespace ErrorCodes
+{
+ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+}
+
+struct S3ClusterFallbackDefinition
+{
+ static constexpr auto name = "s3";
+ static constexpr auto storage_type_name = "S3";
+ static constexpr auto storage_type_cluster_name = "S3Cluster";
+};
+
+struct AzureClusterFallbackDefinition
+{
+ static constexpr auto name = "azureBlobStorage";
+ static constexpr auto storage_type_name = "Azure";
+ static constexpr auto storage_type_cluster_name = "AzureBlobStorageCluster";
+};
+
+struct HDFSClusterFallbackDefinition
+{
+ static constexpr auto name = "hdfs";
+ static constexpr auto storage_type_name = "HDFS";
+ static constexpr auto storage_type_cluster_name = "HDFSCluster";
+};
+
+struct IcebergS3ClusterFallbackDefinition
+{
+ static constexpr auto name = "icebergS3";
+ static constexpr auto storage_type_name = "S3";
+ static constexpr auto storage_type_cluster_name = "IcebergS3Cluster";
+};
+
+struct IcebergAzureClusterFallbackDefinition
+{
+ static constexpr auto name = "icebergAzure";
+ static constexpr auto storage_type_name = "Azure";
+ static constexpr auto storage_type_cluster_name = "IcebergAzureCluster";
+};
+
+struct IcebergHDFSClusterFallbackDefinition
+{
+ static constexpr auto name = "icebergHDFS";
+ static constexpr auto storage_type_name = "HDFS";
+ static constexpr auto storage_type_cluster_name = "IcebergHDFSCluster";
+};
+
+struct DeltaLakeClusterFallbackDefinition
+{
+ static constexpr auto name = "deltaLake";
+ static constexpr auto storage_type_name = "S3";
+ static constexpr auto storage_type_cluster_name = "DeltaLakeS3Cluster";
+};
+
+struct HudiClusterFallbackDefinition
+{
+ static constexpr auto name = "hudi";
+ static constexpr auto storage_type_name = "S3";
+ static constexpr auto storage_type_cluster_name = "HudiS3Cluster";
+};
+
+template
+void TableFunctionObjectStorageClusterFallback::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
+{
+ if (args.empty())
+ throw Exception(
+ ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+ "The function {} should have arguments. The first argument must be the cluster name and the rest are the arguments of "
+ "corresponding table function",
+ getName());
+
+ const auto & settings = context->getSettingsRef();
+
+ is_cluster_function = !settings[Setting::object_storage_cluster_function_cluster].value.empty();
+
+ if (is_cluster_function)
+ {
+ ASTPtr cluster_name_arg = std::make_shared(settings[Setting::object_storage_cluster_function_cluster].value);
+ args.insert(args.begin(), cluster_name_arg);
+ BaseCluster::parseArgumentsImpl(args, context);
+ args.erase(args.begin());
+ }
+ else
+ BaseSimple::parseArgumentsImpl(args, context);
+}
+
+template
+StoragePtr TableFunctionObjectStorageClusterFallback::executeImpl(
+ const ASTPtr & ast_function,
+ ContextPtr context,
+ const std::string & table_name,
+ ColumnsDescription cached_columns,
+ bool is_insert_query) const
+{
+ if (is_cluster_function)
+ {
+ auto result = BaseCluster::executeImpl(ast_function, context, table_name, cached_columns, is_insert_query);
+ if (auto storage = typeid_cast>(result))
+ storage->setClusterNameInSettings(true);
+ return result;
+ }
+ else
+ return BaseSimple::executeImpl(ast_function, context, table_name, cached_columns, is_insert_query);
+}
+
+#if USE_AWS_S3
+using TableFunctionS3ClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+#if USE_AZURE_BLOB_STORAGE
+using TableFunctionAzureClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+#if USE_HDFS
+using TableFunctionHDFSClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+#if USE_AVRO && USE_AWS_S3
+using TableFunctionIcebergS3ClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+#if USE_AVRO && USE_AZURE_BLOB_STORAGE
+using TableFunctionIcebergAzureClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+#if USE_AVRO && USE_HDFS
+using TableFunctionIcebergHDFSClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+#if USE_AWS_S3 && USE_PARQUET
+using TableFunctionDeltaLakeClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+#if USE_AWS_S3
+using TableFunctionHudiClusterFallback = TableFunctionObjectStorageClusterFallback;
+#endif
+
+void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory)
+{
+ UNUSED(factory);
+#if USE_AWS_S3
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the data stored on S3 in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {"s3", "SELECT * FROM s3(url, format, structure)", ""},
+ {"s3", "SELECT * FROM s3(url, format, structure) SETTINGS object_storage_cluster_function_cluster='cluster'", ""}
+ },
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+
+#if USE_AZURE_BLOB_STORAGE
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the data stored on Azure Blob Storage in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {
+ "azureBlobStorage",
+ "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
+ "[account_name, account_key, format, compression, structure])", ""
+ },
+ {
+ "azureBlobStorage",
+ "SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
+ "[account_name, account_key, format, compression, structure]) "
+ "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ },
+ }
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+
+#if USE_HDFS
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the data stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {
+ "hdfs",
+ "SELECT * FROM hdfs(url, format, compression, structure])", ""
+ },
+ {
+ "hdfs",
+ "SELECT * FROM hdfs(url, format, compression, structure]) "
+ "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ },
+ }
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+
+#if USE_AVRO && USE_AWS_S3
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {
+ "icebergS3",
+ "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""
+ },
+ {
+ "icebergS3",
+ "SELECT * FROM icebergS3(url, access_key_id, secret_access_key) "
+ "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ },
+ }
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+
+#if USE_AVRO && USE_AZURE_BLOB_STORAGE
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {
+ "icebergAzure",
+ "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""
+ },
+ {
+ "icebergAzure",
+ "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key) "
+ "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ },
+ }
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+
+#if USE_AVRO && USE_HDFS
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {
+ "icebergHDFS",
+ "SELECT * FROM icebergHDFS(url)", ""
+ },
+ {
+ "icebergHDFS",
+ "SELECT * FROM icebergHDFS(url) SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ },
+ }
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+
+#if USE_AWS_S3 && USE_PARQUET
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {
+ "deltaLake",
+ "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""
+ },
+ {
+ "deltaLake",
+ "SELECT * FROM deltaLake(url, access_key_id, secret_access_key) "
+ "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ },
+ }
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+
+#if USE_AWS_S3
+ factory.registerFunction(
+ {
+ .documentation = {
+ .description=R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster or from single node.)",
+ .examples{
+ {
+ "hudi",
+ "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""
+ },
+ {
+ "hudi",
+ "SELECT * FROM hudi(url, access_key_id, secret_access_key) SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ },
+ }
+ },
+ .allow_readonly = false
+ }
+ );
+#endif
+}
+
+}
diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
new file mode 100644
index 000000000000..5485f08d54da
--- /dev/null
+++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
@@ -0,0 +1,49 @@
+#pragma once
+#include "config.h"
+#include
+
+namespace DB
+{
+
+/**
+* Class implementing s3/hdfs/azureBlobStorage(...) table functions,
+* which allow to use simple or distributed function variant based on settings.
+* If setting `object_storage_cluster_function_cluster` is empty,
+* simple single-host variant is used, if setting not empty, cluster variant is used.
+* `SELECT * FROM s3('s3://...', ...) SETTINGS object_storage_cluster_function_cluster='cluster'`
+* is equal to
+* `SELECT * FROM s3Cluster('cluster', 's3://...', ...)`
+*/
+
+template
+class TableFunctionObjectStorageClusterFallback : public Base
+{
+public:
+ using BaseCluster = Base;
+ using BaseSimple = BaseCluster::Base;
+
+ virtual ~TableFunctionObjectStorageClusterFallback() override = default;
+
+ static constexpr auto name = Definition::name;
+
+ String getName() const override { return name; }
+
+private:
+ const char * getStorageTypeName() const override
+ {
+ return is_cluster_function ? Definition::storage_type_cluster_name : Definition::storage_type_name;
+ }
+
+ StoragePtr executeImpl(
+ const ASTPtr & ast_function,
+ ContextPtr context,
+ const std::string & table_name,
+ ColumnsDescription cached_columns,
+ bool is_insert_query) const override;
+
+ void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override;
+
+ bool is_cluster_function = false;
+};
+
+}
diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp
index 131ca783f73f..c7b852b96fc8 100644
--- a/src/TableFunctions/registerTableFunctions.cpp
+++ b/src/TableFunctions/registerTableFunctions.cpp
@@ -65,6 +65,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory);
+ registerTableFunctionObjectStorageClusterFallback(factory);
registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);
}
diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h
index 8b7d1b0cf60e..142948213352 100644
--- a/src/TableFunctions/registerTableFunctions.h
+++ b/src/TableFunctions/registerTableFunctions.h
@@ -61,6 +61,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
+void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index 03788dd4c128..04fdc22ce991 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -124,8 +124,16 @@ def test_select_all(started_cluster):
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)"""
)
# print(s3_distributed)
+ s3_distributed_alt_syntax = node.query(
+ """
+ SELECT * from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
assert TSV(pure_s3) == TSV(s3_distributed)
+ assert TSV(pure_s3) == TSV(s3_distributed_alt_syntax)
def test_count(started_cluster):
@@ -146,8 +154,17 @@ def test_count(started_cluster):
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(s3_distributed)
+ s3_distributed_alt_syntax = node.query(
+ """
+ SELECT count(*) from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
assert TSV(pure_s3) == TSV(s3_distributed)
+ assert TSV(pure_s3) == TSV(s3_distributed_alt_syntax)
def test_count_macro(started_cluster):
@@ -169,8 +186,17 @@ def test_count_macro(started_cluster):
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(s3_distributed)
+ s3_distributed_alt_syntax = node.query(
+ """
+ SELECT count(*) from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
assert TSV(s3_macro) == TSV(s3_distributed)
+ assert TSV(s3_macro) == TSV(s3_distributed_alt_syntax)
def test_union_all(started_cluster):
@@ -211,8 +237,25 @@ def test_union_all(started_cluster):
"""
)
# print(s3_distributed)
+ s3_distributed_alt_syntax = node.query(
+ """
+ SELECT * FROM
+ (
+ SELECT * from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ UNION ALL
+ SELECT * from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ )
+ ORDER BY (name, value, polygon)
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'
+ """
+ )
assert TSV(pure_s3) == TSV(s3_distributed)
+ assert TSV(pure_s3) == TSV(s3_distributed_alt_syntax)
def test_wrong_cluster(started_cluster):
@@ -233,6 +276,21 @@ def test_wrong_cluster(started_cluster):
assert "not found" in error
+ error = node.query_and_get_error(
+ """
+ SELECT count(*) from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ UNION ALL
+ SELECT count(*) from s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ SETTINGS object_storage_cluster_function_cluster = 'non_existing_cluster'
+ """
+ )
+
+ assert "not found" in error
+
def test_ambiguous_join(started_cluster):
node = started_cluster.instances["s0_0_0"]
@@ -266,6 +324,17 @@ def test_skip_unavailable_shards(started_cluster):
assert result == "10\n"
+ result = node.query(
+ """
+ SELECT count(*) from s3(
+ 'http://minio1:9001/root/data/clickhouse/part1.csv',
+ 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ SETTINGS skip_unavailable_shards = 1, object_storage_cluster_function_cluster = 'cluster_non_existent_port'
+ """
+ )
+
+ assert result == "10\n"
+
def test_unset_skip_unavailable_shards(started_cluster):
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
@@ -281,6 +350,17 @@ def test_unset_skip_unavailable_shards(started_cluster):
assert result == "10\n"
+ result = node.query(
+ """
+ SELECT count(*) from s3(
+ 'http://minio1:9001/root/data/clickhouse/part1.csv',
+ 'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_non_existent_port'
+ """
+ )
+
+ assert result == "10\n"
+
def test_distributed_insert_select_with_replicated(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
@@ -412,6 +492,20 @@ def test_cluster_with_header(started_cluster):
)
== "SomeValue\n"
)
+ assert (
+ node.query(
+ """SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'))
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+ == "SomeValue\n"
+ )
+ assert (
+ node.query(
+ """SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'), 'CSV')
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+ == "SomeValue\n"
+ )
def test_cluster_with_named_collection(started_cluster):
@@ -431,6 +525,20 @@ def test_cluster_with_named_collection(started_cluster):
assert TSV(pure_s3) == TSV(s3_cluster)
+ s3_cluster = node.query(
+ """SELECT * from s3(test_s3) ORDER BY (c1, c2, c3)
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert TSV(pure_s3) == TSV(s3_cluster)
+
+ s3_cluster = node.query(
+ """SELECT * from s3(test_s3, structure='auto') ORDER BY (c1, c2, c3)
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert TSV(pure_s3) == TSV(s3_cluster)
+
def test_cluster_format_detection(started_cluster):
node = started_cluster.instances["s0_0_0"]
@@ -461,6 +569,20 @@ def test_cluster_format_detection(started_cluster):
assert result == expected_result
+ result = node.query(
+ """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123') order by c1, c2
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert result == expected_result
+
+ result = node.query(
+ """SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', auto, 'a String, b UInt64') order by a, b
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert result == expected_result
+
def test_cluster_default_expression(started_cluster):
node = started_cluster.instances["s0_0_0"]
@@ -509,6 +631,41 @@ def test_cluster_default_expression(started_cluster):
assert result == expected_result
+ result = node.query(
+ """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262') order by id
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert result == expected_result
+
+ result = node.query(
+ """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262') order by id
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert result == expected_result
+
+ result = node.query(
+ """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert result == expected_result
+
+ result = node.query(
+ """SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert result == expected_result
+
+ result = node.query(
+ """SELECT * FROM s3(test_s3_with_default) order by id
+ SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ )
+
+ assert result == expected_result
+
def test_remote_hedged(started_cluster):
node = started_cluster.instances["s0_0_0"]
diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py
index 12a511b6e889..b079d3e2109b 100644
--- a/tests/integration/test_storage_azure_blob_storage/test.py
+++ b/tests/integration/test_storage_azure_blob_storage/test.py
@@ -41,14 +41,14 @@ def cluster():
def azure_query(
- node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
+ node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None, query_id=None
):
for i in range(try_num):
try:
if expect_error:
- return node.query_and_get_error(query, settings=settings)
+ return node.query_and_get_error(query, settings=settings, query_id=query_id)
else:
- return node.query(query, settings=settings)
+ return node.query(query, settings=settings, query_id=query_id)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py
index aef6e426572e..ea6b9c512952 100644
--- a/tests/integration/test_storage_azure_blob_storage/test_cluster.py
+++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py
@@ -8,6 +8,7 @@
import random
import threading
import time
+import uuid
import pytest
from azure.storage.blob import BlobServiceClient
@@ -76,21 +77,64 @@ def test_select_all(cluster):
)
print(get_azure_file_content("test_cluster_select_all.csv", port))
+ query_id_pure = str(uuid.uuid4())
pure_azure = azure_query(
node,
f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV','auto')",
+ query_id=query_id_pure,
)
print(pure_azure)
+ query_id_distributed = str(uuid.uuid4())
distributed_azure = azure_query(
node,
f"SELECT * from azureBlobStorageCluster('simple_cluster', '{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
- f"'auto')"
- "",
+ f"'auto')",
+ query_id=query_id_distributed,
)
print(distributed_azure)
+ query_id_distributed_alt_syntax = str(uuid.uuid4())
+ distributed_azure_alt_syntax = azure_query(
+ node,
+ f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
+ f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
+ f"'auto') "
+ f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'",
+ query_id=query_id_distributed_alt_syntax,
+ )
+ print(distributed_azure_alt_syntax)
assert TSV(pure_azure) == TSV(distributed_azure)
+ assert TSV(pure_azure) == TSV(distributed_azure_alt_syntax)
+ for _, node_ in cluster.instances.items():
+ node_.query("SYSTEM FLUSH LOGS")
+ nodes_pure = node.query(
+ f"""
+ SELECT uniq(hostname)
+ FROM clusterAllReplicas('simple_cluster', system.query_log)
+ WHERE type='QueryFinish'
+ AND initial_query_id='{query_id_pure}'
+ """,
+ )
+ assert int(nodes_pure) == 1
+ nodes_distributed = node.query(
+ f"""
+ SELECT uniq(hostname)
+ FROM clusterAllReplicas('simple_cluster', system.query_log)
+ WHERE type='QueryFinish'
+ AND initial_query_id='{query_id_distributed}'
+ """,
+ )
+ assert int(nodes_distributed) == 3
+ nodes_distributed_alt_syntax = node.query(
+ f"""
+ SELECT uniq(hostname)
+ FROM clusterAllReplicas('simple_cluster', system.query_log)
+ WHERE type='QueryFinish'
+ AND initial_query_id='{query_id_distributed_alt_syntax}'
+ """,
+ )
+ assert int(nodes_distributed_alt_syntax) == 3
def test_count(cluster):
@@ -120,7 +164,16 @@ def test_count(cluster):
f"'auto', 'key UInt64')",
)
print(distributed_azure)
+ distributed_azure_alt_syntax = azure_query(
+ node,
+ f"SELECT count(*) from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', "
+ f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
+ f"'auto', 'key UInt64')"
+ f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'",
+ )
+ print(distributed_azure_alt_syntax)
assert TSV(pure_azure) == TSV(distributed_azure)
+ assert TSV(pure_azure) == TSV(distributed_azure_alt_syntax)
def test_union_all(cluster):
diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py
index 007c3f1945f8..cc8de7ca8d58 100644
--- a/tests/integration/test_storage_iceberg/test.py
+++ b/tests/integration/test_storage_iceberg/test.py
@@ -598,16 +598,37 @@ def add_df(mode):
table_function=True,
run_on_cluster=True,
)
+ query_id_cluster = str(uuid.uuid4())
select_cluster = (
- instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
+ instance.query(
+ f"SELECT * FROM {table_function_expr_cluster}", query_id=query_id_cluster
+ )
+ .strip()
+ .split()
+ )
+
+ # Cluster Query with node1 as coordinator with alternative syntax
+ query_id_cluster_alt_syntax = str(uuid.uuid4())
+ select_cluster_alt_syntax = (
+ instance.query(
+ f"""
+ SELECT * FROM {table_function_expr}
+ SETTINGS object_storage_cluster_function_cluster='cluster_simple'
+ """,
+ query_id=query_id_cluster_alt_syntax,
+ )
+ .strip()
+ .split()
)
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
+ assert len(select_cluster_alt_syntax) == 600
# Actual check
assert select_cluster == select_regular
+ assert select_cluster_alt_syntax == select_regular
# Check query_log
for replica in started_cluster.instances.values():
@@ -619,11 +640,29 @@ def add_df(mode):
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
- type = 'QueryStart' AND
- positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
- position(query, '{TABLE_NAME}') != 0 AND
- position(query, 'system.query_log') = 0 AND
- NOT is_initial_query
+ type = 'QueryStart'
+ AND NOT is_initial_query
+ AND initial_query_id='{query_id_cluster}'
+ """
+ )
+ .strip()
+ .split("\n")
+ )
+
+ logging.info(
+ f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
+ )
+ assert len(cluster_secondary_queries) == 1
+
+ for node_name, replica in started_cluster.instances.items():
+ cluster_secondary_queries = (
+ replica.query(
+ f"""
+ SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
+ WHERE
+ type = 'QueryStart'
+ AND NOT is_initial_query
+ AND initial_query_id='{query_id_cluster_alt_syntax}'
"""
)
.strip()
From 622489cf38f00cd90f446131cde5f1c908391f47 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 17 Jan 2025 11:20:04 +0100
Subject: [PATCH 16/45] Fix build
---
src/TableFunctions/TableFunctionObjectStorageClusterFallback.h | 2 --
1 file changed, 2 deletions(-)
diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
index 5485f08d54da..9c7afffb6ed3 100644
--- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
+++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
@@ -22,8 +22,6 @@ class TableFunctionObjectStorageClusterFallback : public Base
using BaseCluster = Base;
using BaseSimple = BaseCluster::Base;
- virtual ~TableFunctionObjectStorageClusterFallback() override = default;
-
static constexpr auto name = Definition::name;
String getName() const override { return name; }
From 2625d1441c316629c727833847dd3b3f41e84097 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 17 Jan 2025 17:12:09 +0100
Subject: [PATCH 17/45] Rename settings
---
src/Core/Settings.cpp | 4 +--
src/Core/SettingsChangesHistory.cpp | 2 ++
...leFunctionObjectStorageClusterFallback.cpp | 22 ++++++------
...ableFunctionObjectStorageClusterFallback.h | 4 +--
tests/integration/test_s3_cluster/test.py | 36 +++++++++----------
.../test_cluster.py | 4 +--
.../integration/test_storage_iceberg/test.py | 2 +-
7 files changed, 38 insertions(+), 36 deletions(-)
diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp
index d674fed8e47e..017638d2be37 100644
--- a/src/Core/Settings.cpp
+++ b/src/Core/Settings.cpp
@@ -6090,10 +6090,10 @@ Trigger processor to spill data into external storage adpatively. grace join is
DECLARE(Bool, allow_experimental_ts_to_grid_aggregate_function, false, R"(
Experimental tsToGrid aggregate function for Prometheus-like timeseries resampling. Cloud only
)", EXPERIMENTAL) \
- DECLARE(String, object_storage_cluster_function_cluster, "", R"(
+ DECLARE(String, object_storage_cluster, "", R"(
Cluster to make distributed requests to object storages with alternative syntax.
)", EXPERIMENTAL) \
- DECLARE(UInt64, object_storage_cluster_function_max_hosts, 0, R"(
+ DECLARE(UInt64, object_storage_max_nodes, 0, R"(
Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
Possible values:
- Positive integer.
diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp
index 4346360cc0fb..15782cae75d3 100644
--- a/src/Core/SettingsChangesHistory.cpp
+++ b/src/Core/SettingsChangesHistory.cpp
@@ -151,6 +151,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "24.11",
{"object_storage_cluster_function_cluster", "", "", "New setting"},
{"object_storage_cluster_function_max_hosts", 0, 0, "New setting"},
+ {"object_storage_cluster", "", "", "New setting"},
+ {"object_storage_max_nodes", 0, 0, "New setting"},
}
},
{"24.11",
diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
index 94408ecb80fb..6cdd85597a44 100644
--- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
+++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
@@ -9,7 +9,7 @@ namespace DB
namespace Setting
{
- extern const SettingsString object_storage_cluster_function_cluster;
+ extern const SettingsString object_storage_cluster;
}
namespace ErrorCodes
@@ -85,11 +85,11 @@ void TableFunctionObjectStorageClusterFallback::parseArguments
const auto & settings = context->getSettingsRef();
- is_cluster_function = !settings[Setting::object_storage_cluster_function_cluster].value.empty();
+ is_cluster_function = !settings[Setting::object_storage_cluster].value.empty();
if (is_cluster_function)
{
- ASTPtr cluster_name_arg = std::make_shared(settings[Setting::object_storage_cluster_function_cluster].value);
+ ASTPtr cluster_name_arg = std::make_shared(settings[Setting::object_storage_cluster].value);
args.insert(args.begin(), cluster_name_arg);
BaseCluster::parseArgumentsImpl(args, context);
args.erase(args.begin());
@@ -159,7 +159,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
.description=R"(The table function can be used to read the data stored on S3 in parallel for many nodes in a specified cluster or from single node.)",
.examples{
{"s3", "SELECT * FROM s3(url, format, structure)", ""},
- {"s3", "SELECT * FROM s3(url, format, structure) SETTINGS object_storage_cluster_function_cluster='cluster'", ""}
+ {"s3", "SELECT * FROM s3(url, format, structure) SETTINGS object_storage_cluster='cluster'", ""}
},
},
.allow_readonly = false
@@ -182,7 +182,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
"azureBlobStorage",
"SELECT * FROM azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
"[account_name, account_key, format, compression, structure]) "
- "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ "SETTINGS object_storage_cluster='cluster'", ""
},
}
},
@@ -204,7 +204,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
{
"hdfs",
"SELECT * FROM hdfs(url, format, compression, structure]) "
- "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ "SETTINGS object_storage_cluster='cluster'", ""
},
}
},
@@ -226,7 +226,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
{
"icebergS3",
"SELECT * FROM icebergS3(url, access_key_id, secret_access_key) "
- "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ "SETTINGS object_storage_cluster='cluster'", ""
},
}
},
@@ -248,7 +248,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
{
"icebergAzure",
"SELECT * FROM icebergAzure(url, access_key_id, secret_access_key) "
- "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ "SETTINGS object_storage_cluster='cluster'", ""
},
}
},
@@ -269,7 +269,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
},
{
"icebergHDFS",
- "SELECT * FROM icebergHDFS(url) SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ "SELECT * FROM icebergHDFS(url) SETTINGS object_storage_cluster='cluster'", ""
},
}
},
@@ -291,7 +291,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
{
"deltaLake",
"SELECT * FROM deltaLake(url, access_key_id, secret_access_key) "
- "SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ "SETTINGS object_storage_cluster='cluster'", ""
},
}
},
@@ -312,7 +312,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
},
{
"hudi",
- "SELECT * FROM hudi(url, access_key_id, secret_access_key) SETTINGS object_storage_cluster_function_cluster='cluster'", ""
+ "SELECT * FROM hudi(url, access_key_id, secret_access_key) SETTINGS object_storage_cluster='cluster'", ""
},
}
},
diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
index 9c7afffb6ed3..afa6b8b49f11 100644
--- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
+++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.h
@@ -8,9 +8,9 @@ namespace DB
/**
* Class implementing s3/hdfs/azureBlobStorage(...) table functions,
* which allow to use simple or distributed function variant based on settings.
-* If setting `object_storage_cluster_function_cluster` is empty,
+* If setting `object_storage_cluster` is empty,
* simple single-host variant is used, if setting not empty, cluster variant is used.
-* `SELECT * FROM s3('s3://...', ...) SETTINGS object_storage_cluster_function_cluster='cluster'`
+* `SELECT * FROM s3('s3://...', ...) SETTINGS object_storage_cluster='cluster'`
* is equal to
* `SELECT * FROM s3Cluster('cluster', 's3://...', ...)`
*/
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index 04fdc22ce991..c20338f96622 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -129,7 +129,7 @@ def test_select_all(started_cluster):
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert TSV(pure_s3) == TSV(s3_distributed)
@@ -160,7 +160,7 @@ def test_count(started_cluster):
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert TSV(pure_s3) == TSV(s3_distributed)
@@ -192,7 +192,7 @@ def test_count_macro(started_cluster):
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert TSV(s3_macro) == TSV(s3_distributed)
@@ -250,7 +250,7 @@ def test_union_all(started_cluster):
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'
+ SETTINGS object_storage_cluster = 'cluster_simple'
"""
)
@@ -285,7 +285,7 @@ def test_wrong_cluster(started_cluster):
SELECT count(*) from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
- SETTINGS object_storage_cluster_function_cluster = 'non_existing_cluster'
+ SETTINGS object_storage_cluster = 'non_existing_cluster'
"""
)
@@ -329,7 +329,7 @@ def test_skip_unavailable_shards(started_cluster):
SELECT count(*) from s3(
'http://minio1:9001/root/data/clickhouse/part1.csv',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
- SETTINGS skip_unavailable_shards = 1, object_storage_cluster_function_cluster = 'cluster_non_existent_port'
+ SETTINGS skip_unavailable_shards = 1, object_storage_cluster = 'cluster_non_existent_port'
"""
)
@@ -355,7 +355,7 @@ def test_unset_skip_unavailable_shards(started_cluster):
SELECT count(*) from s3(
'http://minio1:9001/root/data/clickhouse/part1.csv',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
- SETTINGS object_storage_cluster_function_cluster = 'cluster_non_existent_port'
+ SETTINGS object_storage_cluster = 'cluster_non_existent_port'
"""
)
@@ -495,14 +495,14 @@ def test_cluster_with_header(started_cluster):
assert (
node.query(
"""SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'))
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
== "SomeValue\n"
)
assert (
node.query(
"""SELECT * from s3('http://resolver:8080/bucket/key.csv', headers(MyCustomHeader = 'SomeValue'), 'CSV')
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
== "SomeValue\n"
)
@@ -527,14 +527,14 @@ def test_cluster_with_named_collection(started_cluster):
s3_cluster = node.query(
"""SELECT * from s3(test_s3) ORDER BY (c1, c2, c3)
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert TSV(pure_s3) == TSV(s3_cluster)
s3_cluster = node.query(
"""SELECT * from s3(test_s3, structure='auto') ORDER BY (c1, c2, c3)
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert TSV(pure_s3) == TSV(s3_cluster)
@@ -571,14 +571,14 @@ def test_cluster_format_detection(started_cluster):
result = node.query(
"""SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123') order by c1, c2
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert result == expected_result
result = node.query(
"""SELECT * FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', auto, 'a String, b UInt64') order by a, b
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert result == expected_result
@@ -633,35 +633,35 @@ def test_cluster_default_expression(started_cluster):
result = node.query(
"""SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262') order by id
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert result == expected_result
result = node.query(
"""SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262') order by id
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert result == expected_result
result = node.query(
"""SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert result == expected_result
result = node.query(
"""SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'auto', 'id UInt32, date Date DEFAULT 18262', 'auto') order by id
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert result == expected_result
result = node.query(
"""SELECT * FROM s3(test_s3_with_default) order by id
- SETTINGS object_storage_cluster_function_cluster = 'cluster_simple'"""
+ SETTINGS object_storage_cluster = 'cluster_simple'"""
)
assert result == expected_result
diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py
index ea6b9c512952..54e1ced79577 100644
--- a/tests/integration/test_storage_azure_blob_storage/test_cluster.py
+++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py
@@ -100,7 +100,7 @@ def test_select_all(cluster):
f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto') "
- f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'",
+ f"SETTINGS object_storage_cluster='simple_cluster'",
query_id=query_id_distributed_alt_syntax,
)
print(distributed_azure_alt_syntax)
@@ -169,7 +169,7 @@ def test_count(cluster):
f"SELECT count(*) from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', "
f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto', 'key UInt64')"
- f"SETTINGS object_storage_cluster_function_cluster='simple_cluster'",
+ f"SETTINGS object_storage_cluster='simple_cluster'",
)
print(distributed_azure_alt_syntax)
assert TSV(pure_azure) == TSV(distributed_azure)
diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py
index cc8de7ca8d58..9b8cddabb265 100644
--- a/tests/integration/test_storage_iceberg/test.py
+++ b/tests/integration/test_storage_iceberg/test.py
@@ -613,7 +613,7 @@ def add_df(mode):
instance.query(
f"""
SELECT * FROM {table_function_expr}
- SETTINGS object_storage_cluster_function_cluster='cluster_simple'
+ SETTINGS object_storage_cluster='cluster_simple'
""",
query_id=query_id_cluster_alt_syntax,
)
From ab06c3b7a5b67399f8d4fe022292ced70687d4e7 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 4 Apr 2025 21:24:20 +0200
Subject: [PATCH 18/45] Fix build
---
src/Core/SettingsChangesHistory.cpp | 13 ++++++-------
.../TableFunctionObjectStorageClusterFallback.cpp | 4 ++--
2 files changed, 8 insertions(+), 9 deletions(-)
diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp
index 15782cae75d3..22541e792191 100644
--- a/src/Core/SettingsChangesHistory.cpp
+++ b/src/Core/SettingsChangesHistory.cpp
@@ -64,6 +64,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
+
+ addSettingsChanges(settings_changes_history, "25.2.antalya",
+ {
+ {"object_storage_cluster", "", "", "New setting"},
+ {"object_storage_max_nodes", 0, 0, "New setting"},
+ });
addSettingsChanges(settings_changes_history, "25.2",
{
{"schema_inference_make_json_columns_nullable", false, false, "Allow to infer Nullable(JSON) during schema inference"},
@@ -149,13 +155,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// Release closed. Please use 25.1
});
addSettingsChanges(settings_changes_history, "24.11",
- {"object_storage_cluster_function_cluster", "", "", "New setting"},
- {"object_storage_cluster_function_max_hosts", 0, 0, "New setting"},
- {"object_storage_cluster", "", "", "New setting"},
- {"object_storage_max_nodes", 0, 0, "New setting"},
- }
- },
- {"24.11",
{
{"validate_mutation_query", false, true, "New setting to validate mutation queries by default."},
{"enable_job_stack_trace", false, true, "Enable by default collecting stack traces from job's scheduling."},
diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
index 6cdd85597a44..1b08e85321fd 100644
--- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
+++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp
@@ -141,7 +141,7 @@ using TableFunctionIcebergAzureClusterFallback = TableFunctionObjectStorageClust
using TableFunctionIcebergHDFSClusterFallback = TableFunctionObjectStorageClusterFallback;
#endif
-#if USE_AWS_S3 && USE_PARQUET
+#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS
using TableFunctionDeltaLakeClusterFallback = TableFunctionObjectStorageClusterFallback;
#endif
@@ -278,7 +278,7 @@ void registerTableFunctionObjectStorageClusterFallback(TableFunctionFactory & fa
);
#endif
-#if USE_AWS_S3 && USE_PARQUET
+#if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS
factory.registerFunction(
{
.documentation = {
From 9bd2ea336f5a92ad3ef01b28ae52cd9a24e3f913 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Thu, 30 Jan 2025 17:47:06 +0100
Subject: [PATCH 19/45] Distributed request to tables with Object Storage
Engines
---
src/Databases/Iceberg/DatabaseIceberg.cpp | 47 +++++---
.../Iceberg/DatabaseIcebergSettings.cpp | 1 +
src/Storages/IStorageCluster.h | 2 +
.../ObjectStorage/Azure/Configuration.cpp | 33 +++++-
.../ObjectStorage/Azure/Configuration.h | 3 +
.../ObjectStorage/HDFS/Configuration.cpp | 10 ++
.../ObjectStorage/HDFS/Configuration.h | 2 +
.../ObjectStorage/S3/Configuration.cpp | 31 +++++-
src/Storages/ObjectStorage/S3/Configuration.h | 2 +
.../ObjectStorage/StorageObjectStorage.cpp | 6 +-
.../ObjectStorage/StorageObjectStorage.h | 4 +
.../StorageObjectStorageCluster.cpp | 105 +++++++++++++++++-
.../StorageObjectStorageCluster.h | 2 +
.../StorageObjectStorageSettings.cpp | 3 +
.../StorageObjectStorageSource.cpp | 3 +
.../registerStorageObjectStorage.cpp | 60 +++++++---
tests/integration/test_s3_cluster/test.py | 67 +++++++++++
.../test_cluster.py | 67 +++++++++++
.../integration/test_storage_iceberg/test.py | 79 ++++++++++---
19 files changed, 475 insertions(+), 52 deletions(-)
diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp
index 0a8414d1aa93..692a138e46a2 100644
--- a/src/Databases/Iceberg/DatabaseIceberg.cpp
+++ b/src/Databases/Iceberg/DatabaseIceberg.cpp
@@ -12,6 +12,7 @@
#include
#include
#include
+#include
#include
#include
@@ -37,10 +38,12 @@ namespace DatabaseIcebergSetting
extern const DatabaseIcebergSettingsString storage_endpoint;
extern const DatabaseIcebergSettingsString oauth_server_uri;
extern const DatabaseIcebergSettingsBool vended_credentials;
+ extern const DatabaseIcebergSettingsString object_storage_cluster;
}
namespace Setting
{
extern const SettingsBool allow_experimental_database_iceberg;
+ extern const SettingsString object_storage_cluster;
}
namespace ErrorCodes
@@ -230,19 +233,37 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
/// no table structure in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, storage_settings.get());
- return std::make_shared(
- configuration,
- configuration->createObjectStorage(context_, /* is_readonly */ false),
- context_,
- StorageID(getDatabaseName(), name),
- /* columns */columns,
- /* constraints */ConstraintsDescription{},
- /* comment */"",
- getFormatSettings(context_),
- LoadingStrictnessLevel::CREATE,
- /* distributed_processing */false,
- /* partition_by */nullptr,
- /* lazy_init */true);
+ auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;
+ if (cluster_name.empty())
+ cluster_name = context_->getSettingsRef()[Setting::object_storage_cluster].value;
+
+ if (cluster_name.empty())
+ {
+ return std::make_shared(
+ configuration,
+ configuration->createObjectStorage(context_, /* is_readonly */ false),
+ context_,
+ StorageID(getDatabaseName(), name),
+ /* columns */columns,
+ /* constraints */ConstraintsDescription{},
+ /* comment */"",
+ getFormatSettings(context_),
+ LoadingStrictnessLevel::CREATE,
+ /* distributed_processing */false,
+ /* partition_by */nullptr,
+ /* lazy_init */true);
+ }
+ else
+ {
+ return std::make_shared(
+ cluster_name,
+ configuration,
+ configuration->createObjectStorage(context_, /* is_readonly */ false),
+ StorageID(getDatabaseName(), name),
+ columns,
+ ConstraintsDescription{},
+ context_);
+ }
}
DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
diff --git a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp
index 37b4909106ba..4847309a6283 100644
--- a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp
+++ b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp
@@ -23,6 +23,7 @@ namespace ErrorCodes
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
+ DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \
#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h
index 4d7a047e0c3e..3bcc467e3135 100644
--- a/src/Storages/IStorageCluster.h
+++ b/src/Storages/IStorageCluster.h
@@ -43,6 +43,8 @@ class IStorageCluster : public IStorage
bool supportsOptimizationToSubcolumns() const override { return false; }
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }
+ const String & getClusterName() const { return cluster_name; }
+
protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp
index a3052b9a5c16..1579c4c52a7d 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.cpp
+++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp
@@ -155,6 +155,14 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll
compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"));
blobs_paths = {blob_path};
+ if (account_name && account_key)
+ {
+ if (saved_params.empty())
+ {
+ saved_params.push_back(*account_name);
+ saved_params.push_back(*account_key);
+ }
+ }
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
}
@@ -174,7 +182,6 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
std::unordered_map engine_args_to_idx;
-
String connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url");
String container_name = checkAndGetLiteralArgument(engine_args[1], "container");
blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath");
@@ -280,6 +287,14 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}
blobs_paths = {blob_path};
+ if (account_name && account_key)
+ {
+ if (saved_params.empty())
+ {
+ saved_params.push_back(*account_name);
+ saved_params.push_back(*account_key);
+ }
+ }
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
}
@@ -445,6 +460,22 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
+void StorageAzureConfiguration::setFunctionArgs(ASTs & args) const
+{
+ if (!args.empty())
+ { /// Just check
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
+ }
+
+ args.push_back(std::make_shared(connection_params.endpoint.storage_account_url));
+ args.push_back(std::make_shared(connection_params.endpoint.container_name));
+ args.push_back(std::make_shared(blob_path));
+ for (const auto & arg : saved_params)
+ {
+ args.push_back(std::make_shared(arg));
+ }
+}
+
}
#endif
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h
index 72124465c462..5ab8c3d71455 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.h
+++ b/src/Storages/ObjectStorage/Azure/Configuration.h
@@ -79,6 +79,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
+ void setFunctionArgs(ASTs & args) const override;
+
protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
@@ -86,6 +88,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
std::string blob_path;
std::vector blobs_paths;
AzureBlobStorage::ConnectionParams connection_params;
+ std::vector saved_params;
};
}
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
index 143cdc756ea9..83e480b09d50 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
@@ -236,6 +236,16 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
+void StorageHDFSConfiguration::setFunctionArgs(ASTs & args) const
+{
+ if (!args.empty())
+ { /// Just check
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
+ }
+
+ args.push_back(std::make_shared(url + path));
+}
+
}
#endif
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h
index db8ab7f9e4db..dbef04aca4aa 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.h
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.h
@@ -65,6 +65,8 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
+ void setFunctionArgs(ASTs & args) const override;
+
private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr, bool /* with_structure */) override;
diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp
index 9ffc449a1524..63b618f3a151 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.cpp
+++ b/src/Storages/ObjectStorage/S3/Configuration.cpp
@@ -364,11 +364,11 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
if (engine_args_to_idx.contains("format"))
{
- format = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format");
+ auto format_ = checkAndGetLiteralArgument(args[engine_args_to_idx["format"]], "format");
/// Set format to configuration only of it's not 'auto',
/// because we can have default format set in configuration.
- if (format != "auto")
- format = format;
+ if (format_ != "auto")
+ format = format_;
}
if (engine_args_to_idx.contains("structure"))
@@ -586,6 +586,31 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
}
+void StorageS3Configuration::setFunctionArgs(ASTs & args) const
+{
+ if (!args.empty())
+ { /// Just check
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
+ }
+
+ args.push_back(std::make_shared(url.uri_str));
+ if (auth_settings[S3AuthSetting::no_sign_request])
+ {
+ args.push_back(std::make_shared("NOSIGN"));
+ }
+ else
+ {
+ args.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value));
+ args.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value));
+ if (!auth_settings[S3AuthSetting::session_token].value.empty())
+ args.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value));
+ if (format != "auto")
+ args.push_back(std::make_shared(format));
+ if (!compression_method.empty())
+ args.push_back(std::make_shared(compression_method));
+ }
+}
+
}
#endif
diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h
index ad2d136e0586..46a54f4490e0 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.h
+++ b/src/Storages/ObjectStorage/S3/Configuration.h
@@ -97,6 +97,8 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
+ void setFunctionArgs(ASTs & args) const override;
+
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp
index 33b38c678da2..0777ae1bc081 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp
@@ -63,6 +63,9 @@ String StorageObjectStorage::getPathSample(ContextPtr context)
if (context->getSettingsRef()[Setting::use_hive_partitioning])
local_distributed_processing = false;
+ if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing)
+ return configuration->getPath();
+
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
query_settings,
@@ -75,9 +78,6 @@ String StorageObjectStorage::getPathSample(ContextPtr context)
{} // file_progress_callback
);
- if (!configuration->isArchive() && !configuration->isPathWithGlobs() && !local_distributed_processing)
- return configuration->getPath();
-
if (auto file = file_iterator->next(0))
return file->getPath();
return "";
diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h
index 189c4226ad66..4ac5fc4aafef 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorage.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorage.h
@@ -253,6 +253,10 @@ class StorageObjectStorage::Configuration
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
+ virtual void setFunctionArgs(ASTs & /* args */) const
+ {
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method setFunctionArgs is not supported by storage {}", getEngineName());
+ }
protected:
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index 54478dab1de8..a0a787a5e7d1 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -4,15 +4,20 @@
#include
#include
#include
+#include
+#include
+#include
+#include
#include
#include
+#include
+#include
#include
#include
#include
#include
-
namespace DB
{
namespace Setting
@@ -23,13 +28,19 @@ namespace Setting
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
+ extern const int UNKNOWN_FUNCTION;
}
+
String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metadata, ContextPtr context)
{
auto query_settings = configuration->getQuerySettings(context);
/// We don't want to throw an exception if there are no files with specified path.
query_settings.throw_on_zero_files_match = false;
+
+ if (!configuration->isArchive() && !configuration->isPathWithGlobs())
+ return configuration->getPath();
+
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
query_settings,
@@ -44,6 +55,7 @@ String StorageObjectStorageCluster::getPathSample(StorageInMemoryMetadata metada
if (auto file = file_iterator->next(0))
return file->getPath();
+
return "";
}
@@ -82,12 +94,103 @@ std::string StorageObjectStorageCluster::getName() const
return configuration->getEngineName();
}
+void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query)
+{
+ // Change table engine on table function for distributed request
+ // CREATE TABLE t (...) ENGINE=IcebergS3(...)
+ // SELECT * FROM t
+ // change on
+ // SELECT * FROM icebergS3(...)
+ // to execute on cluster nodes
+
+ auto * select_query = query->as();
+ if (!select_query || !select_query->tables())
+ return;
+
+ auto * tables = select_query->tables()->as();
+ auto * table_expression = tables->children[0]->as()->table_expression->as();
+ if (!table_expression->database_and_table_name)
+ return;
+
+ auto & table_identifier_typed = table_expression->database_and_table_name->as();
+
+ auto table_alias = table_identifier_typed.tryGetAlias();
+
+ std::unordered_map engine_to_function = {
+ {"S3", "s3"},
+ {"Azure", "azureBlobStorage"},
+ {"HDFS", "hdfs"},
+ {"IcebergS3", "icebergS3"},
+ {"IcebergAzure", "icebergAzure"},
+ {"IcebergHDFS", "icebergHDFS"},
+ {"DeltaLake", "deltaLake"},
+ {"Hudi", "hudi"}
+ };
+
+ auto p = engine_to_function.find(configuration->getEngineName());
+ if (p == engine_to_function.end())
+ {
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Can't find table function for engine {}",
+ configuration->getEngineName()
+ );
+ }
+
+ std::string table_function_name = p->second;
+
+ auto function_ast = std::make_shared();
+ function_ast->name = table_function_name;
+ auto arguments = std::make_shared();
+
+ auto cluster_name = getClusterName();
+
+ if (cluster_name.empty())
+ {
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Can't be here without cluster name, no cluster name in query {}",
+ queryToString(query));
+ }
+
+ configuration->setFunctionArgs(arguments->children);
+
+ function_ast->arguments = arguments;
+ function_ast->children.push_back(arguments);
+ function_ast->setAlias(table_alias);
+
+ ASTPtr function_ast_ptr(function_ast);
+
+ table_expression->database_and_table_name = nullptr;
+ table_expression->table_function = function_ast_ptr;
+ table_expression->children[0].swap(function_ast_ptr);
+
+ auto settings = select_query->settings();
+ if (settings)
+ {
+ auto & settings_ast = settings->as();
+ settings_ast.changes.insertSetting("object_storage_cluster", cluster_name);
+ }
+ else
+ {
+ auto settings_ast_ptr = std::make_shared();
+ settings_ast_ptr->is_standalone = false;
+ settings_ast_ptr->changes.setSetting("object_storage_cluster", cluster_name);
+ select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, std::move(settings_ast_ptr));
+ }
+
+ cluster_name_in_settings = true;
+}
+
void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
ASTPtr & query,
const DB::StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context)
{
+ updateQueryForDistributedEngineIfNeeded(query);
+
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
+
if (!expression_list)
{
throw Exception(
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
index 32a942d4a857..941566a6d12c 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
@@ -37,6 +37,8 @@ class StorageObjectStorageCluster : public IStorageCluster
const StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context) override;
+ void updateQueryForDistributedEngineIfNeeded(ASTPtr & query);
+
const String engine_name;
const StorageObjectStorage::ConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp
index 3edf1efccdf9..0d1aa6b04779 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp
@@ -19,6 +19,9 @@ If enabled, indicates that metadata is taken from iceberg specification that is
)", 0) \
DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"(
If enabled, the engine would use delta-kernel-rs for DeltaLake metadata parsing
+)", 0) \
+ DECLARE(String, object_storage_cluster, "", R"(
+Cluster for distributed requests
)", 0) \
// clang-format on
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
index c779a3cd7622..eb5f7a16aead 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
@@ -138,7 +138,10 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator(
const bool is_archive = configuration->isArchive();
+ configuration->update(object_storage, local_context);
+
std::unique_ptr iterator;
+
if (configuration->isPathWithGlobs())
{
auto path = configuration->getPath();
diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
index 34ce072eca62..2e93f16e09b7 100644
--- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
+++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
@@ -6,6 +6,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -19,6 +20,16 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
+namespace Setting
+{
+ extern const SettingsString object_storage_cluster;
+}
+
+namespace StorageObjectStorageSetting
+{
+ extern const StorageObjectStorageSettingsString object_storage_cluster;
+}
+
namespace
{
@@ -26,7 +37,7 @@ namespace
#if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS || USE_AVRO
std::shared_ptr
-createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObjectStorage::ConfigurationPtr configuration)
+createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObjectStorage::ConfigurationPtr configuration, ContextPtr context)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
@@ -38,6 +49,8 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
if (args.storage_def->settings)
queue_settings->loadFromQuery(*args.storage_def->settings);
+ auto cluster_name = (*queue_settings)[StorageObjectStorageSetting::object_storage_cluster].value;
+
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false, queue_settings.get());
// Use format settings from global server context + settings from
@@ -62,20 +75,37 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
- return std::make_shared(
- configuration,
- // We only want to perform write actions (e.g. create a container in Azure) when the table is being created,
- // and we want to avoid it when we load the table after a server restart.
- configuration->createObjectStorage(context, /* is_readonly */ args.mode != LoadingStrictnessLevel::CREATE),
- args.getContext(), /// Use global context.
- args.table_id,
- args.columns,
- args.constraints,
- args.comment,
- format_settings,
- args.mode,
- /* distributed_processing */ false,
- partition_by);
+ if (cluster_name.empty())
+ cluster_name = context->getSettingsRef()[Setting::object_storage_cluster].value;
+
+ if (cluster_name.empty())
+ {
+ return std::make_shared(
+ configuration,
+ // We only want to perform write actions (e.g. create a container in Azure) when the table is being created,
+ // and we want to avoid it when we load the table after a server restart.
+ configuration->createObjectStorage(context, /* is_readonly */ args.mode != LoadingStrictnessLevel::CREATE),
+ args.getContext(), /// Use global context.
+ args.table_id,
+ args.columns,
+ args.constraints,
+ args.comment,
+ format_settings,
+ args.mode,
+ /* distributed_processing */ false,
+ partition_by);
+ }
+ else
+ {
+ return std::make_shared(
+ cluster_name,
+ configuration,
+ configuration->createObjectStorage(context, /* is_readonly */ false),
+ args.table_id,
+ args.columns,
+ args.constraints,
+ args.getContext());
+ }
}
#endif
diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py
index c20338f96622..837be9cc6a0f 100644
--- a/tests/integration/test_s3_cluster/test.py
+++ b/tests/integration/test_s3_cluster/test.py
@@ -845,3 +845,70 @@ def test_hive_partitioning(started_cluster):
)
cluster_optimized_traffic = int(cluster_optimized_traffic)
assert cluster_optimized_traffic == optimized_traffic
+
+
+def test_distributed_s3_table_engine(started_cluster):
+ node = started_cluster.instances["s0_0_0"]
+
+ resp_def = node.query(
+ """
+ SELECT * from s3Cluster(
+ 'cluster_simple',
+ 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
+ """
+ )
+
+ node.query("DROP TABLE IF EXISTS single_node");
+ node.query(
+ """
+ CREATE TABLE single_node
+ (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
+ ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
+ """
+ )
+ query_id_engine_single_node = str(uuid.uuid4())
+ resp_engine_single_node = node.query(
+ """
+ SELECT * FROM single_node ORDER BY (name, value, polygon)
+ """,
+ query_id = query_id_engine_single_node
+ )
+ assert resp_def == resp_engine_single_node
+
+ node.query("DROP TABLE IF EXISTS distributed");
+ node.query(
+ """
+ CREATE TABLE distributed
+ (name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64))))
+ ENGINE=S3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV')
+ SETTINGS object_storage_cluster='cluster_simple'
+ """
+ )
+ query_id_engine_distributed = str(uuid.uuid4())
+ resp_engine_distributed = node.query(
+ """
+ SELECT * FROM distributed ORDER BY (name, value, polygon)
+ """,
+ query_id = query_id_engine_distributed
+ )
+ assert resp_def == resp_engine_distributed
+
+ node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_simple'")
+
+ hosts_engine_single_node = node.query(
+ f"""
+ SELECT uniq(hostname)
+ FROM clusterAllReplicas('cluster_simple', system.query_log)
+ WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_single_node}'
+ """
+ )
+ assert int(hosts_engine_single_node) == 1
+ hosts_engine_distributed = node.query(
+ f"""
+ SELECT uniq(hostname)
+ FROM clusterAllReplicas('cluster_simple', system.query_log)
+ WHERE type='QueryFinish' AND initial_query_id='{query_id_engine_distributed}'
+ """
+ )
+ assert int(hosts_engine_distributed) == 3
diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py
index 54e1ced79577..2036de1becd6 100644
--- a/tests/integration/test_storage_azure_blob_storage/test_cluster.py
+++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py
@@ -104,8 +104,57 @@ def test_select_all(cluster):
query_id=query_id_distributed_alt_syntax,
)
print(distributed_azure_alt_syntax)
+ azure_query(
+ node,
+ f"""
+ DROP TABLE IF EXISTS azure_engine_table_single_node;
+ CREATE TABLE azure_engine_table_single_node
+ (key UInt64, data String)
+ ENGINE=AzureBlobStorage(
+ '{storage_account_url}',
+ 'cont',
+ 'test_cluster_select_all.csv',
+ 'devstoreaccount1',
+ 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
+ 'CSV',
+ 'auto'
+ )
+ """,
+ )
+ query_id_engine_single_node = str(uuid.uuid4())
+ azure_engine_single_node = azure_query(
+ node,
+ "SELECT * FROM azure_engine_table_single_node",
+ query_id=query_id_engine_single_node,
+ )
+ azure_query(
+ node,
+ f"""
+ DROP TABLE IF EXISTS azure_engine_table_distributed;
+ CREATE TABLE azure_engine_table_distributed
+ (key UInt64, data String)
+ ENGINE=AzureBlobStorage(
+ '{storage_account_url}',
+ 'cont',
+ 'test_cluster_select_all.csv',
+ 'devstoreaccount1',
+ 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
+ 'CSV',
+ 'auto'
+ )
+ SETTINGS object_storage_cluster='simple_cluster'
+ """,
+ )
+ query_id_engine_distributed = str(uuid.uuid4())
+ azure_engine_distributed = azure_query(
+ node,
+ "SELECT * FROM azure_engine_table_distributed",
+ query_id=query_id_engine_distributed,
+ )
assert TSV(pure_azure) == TSV(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure_alt_syntax)
+ assert TSV(pure_azure) == TSV(azure_engine_single_node)
+ assert TSV(pure_azure) == TSV(azure_engine_distributed)
for _, node_ in cluster.instances.items():
node_.query("SYSTEM FLUSH LOGS")
nodes_pure = node.query(
@@ -135,6 +184,24 @@ def test_select_all(cluster):
""",
)
assert int(nodes_distributed_alt_syntax) == 3
+ nodes_engine_single_node = node.query(
+ f"""
+ SELECT uniq(hostname)
+ FROM clusterAllReplicas('simple_cluster', system.query_log)
+ WHERE type='QueryFinish'
+ AND initial_query_id='{query_id_engine_single_node}'
+ """,
+ )
+ assert int(nodes_engine_single_node) == 1
+ nodes_engine_distributed = node.query(
+ f"""
+ SELECT uniq(hostname)
+ FROM clusterAllReplicas('simple_cluster', system.query_log)
+ WHERE type='QueryFinish'
+ AND initial_query_id='{query_id_engine_distributed}'
+ """,
+ )
+ assert int(nodes_engine_distributed) == 3
def test_count(cluster):
diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py
index 9b8cddabb265..9214a3e75a29 100644
--- a/tests/integration/test_storage_iceberg/test.py
+++ b/tests/integration/test_storage_iceberg/test.py
@@ -201,13 +201,17 @@ def get_creation_expression(
table_function=False,
allow_dynamic_metadata_for_data_lakes=False,
run_on_cluster=False,
+ object_storage_cluster=False,
**kwargs,
):
- allow_dynamic_metadata_for_datalakes_suffix = (
- " SETTINGS allow_dynamic_metadata_for_data_lakes = 1"
- if allow_dynamic_metadata_for_data_lakes
- else ""
- )
+ settings_suffix = ""
+ if allow_dynamic_metadata_for_data_lakes or object_storage_cluster:
+ settings = []
+ if allow_dynamic_metadata_for_data_lakes:
+ settings.append("allow_dynamic_metadata_for_data_lakes = 1")
+ if object_storage_cluster:
+ settings.append(f"object_storage_cluster = '{object_storage_cluster}'")
+ settings_suffix = " SETTINGS " + ", ".join(settings)
if storage_type == "s3":
if "bucket" in kwargs:
@@ -227,7 +231,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
- + allow_dynamic_metadata_for_datalakes_suffix
+ + settings_suffix
)
elif storage_type == "azure":
@@ -247,7 +251,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
- + allow_dynamic_metadata_for_datalakes_suffix
+ + settings_suffix
)
elif storage_type == "local":
@@ -263,7 +267,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})"""
- + allow_dynamic_metadata_for_datalakes_suffix
+ + settings_suffix
)
else:
@@ -299,10 +303,18 @@ def create_iceberg_table(
table_name,
cluster,
format="Parquet",
+ object_storage_cluster=False,
**kwargs,
):
node.query(
- get_creation_expression(storage_type, table_name, cluster, format, **kwargs)
+ get_creation_expression(
+ storage_type,
+ table_name,
+ cluster,
+ format,
+ object_storage_cluster=object_storage_cluster,
+ **kwargs,
+ )
)
@@ -621,14 +633,37 @@ def add_df(mode):
.split()
)
+ create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, object_storage_cluster='cluster_simple')
+ query_id_cluster_table_engine = str(uuid.uuid4())
+ select_cluster_table_engine = (
+ instance.query(
+ f"""
+ SELECT * FROM {TABLE_NAME}
+ """,
+ query_id=query_id_cluster_table_engine,
+ )
+ .strip()
+ .split()
+ )
+
+ select_remote_cluster = (
+ instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})")
+ .strip()
+ .split()
+ )
+
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
assert len(select_cluster_alt_syntax) == 600
+ assert len(select_cluster_table_engine) == 600
+ assert len(select_remote_cluster) == 600
# Actual check
assert select_cluster == select_regular
assert select_cluster_alt_syntax == select_regular
+ assert select_cluster_table_engine == select_regular
+ assert select_remote_cluster == select_regular
# Check query_log
for replica in started_cluster.instances.values():
@@ -674,13 +709,25 @@ def add_df(mode):
)
assert len(cluster_secondary_queries) == 1
- select_remote_cluster = (
- instance.query(f"SELECT * FROM remote('node2',{table_function_expr_cluster})")
- .strip()
- .split()
- )
- assert len(select_remote_cluster) == 600
- assert select_remote_cluster == select_regular
+ for node_name, replica in started_cluster.instances.items():
+ cluster_secondary_queries = (
+ replica.query(
+ f"""
+ SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
+ WHERE
+ type = 'QueryStart'
+ AND NOT is_initial_query
+ AND initial_query_id='{query_id_cluster_table_engine}'
+ """
+ )
+ .strip()
+ .split("\n")
+ )
+
+ logging.info(
+ f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
+ )
+ assert len(cluster_secondary_queries) == 1
@pytest.mark.parametrize("format_version", ["1", "2"])
From 7a54424fb56e127cbc6d8ed1341bfd06b2eef49e Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 7 Feb 2025 10:21:36 +0100
Subject: [PATCH 20/45] Fix tests
---
src/Core/SettingsChangesHistory.cpp | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp
index 22541e792191..f613f67ce238 100644
--- a/src/Core/SettingsChangesHistory.cpp
+++ b/src/Core/SettingsChangesHistory.cpp
@@ -155,6 +155,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// Release closed. Please use 25.1
});
addSettingsChanges(settings_changes_history, "24.11",
+ {"object_storage_cluster", "", "", "New setting"},
+ {"object_storage_max_nodes", 0, 0, "New setting"},
+ {"input_format_parquet_use_metadata_cache", false, false, "New setting"},
+ }
+ },
+ {"24.11",
{
{"validate_mutation_query", false, true, "New setting to validate mutation queries by default."},
{"enable_job_stack_trace", false, true, "Enable by default collecting stack traces from job's scheduling."},
From fe89fa29d92aa4e8a2bd1f159e9a5d7087a6ae17 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Thu, 13 Feb 2025 15:46:43 +0100
Subject: [PATCH 21/45] Fixes after review
---
src/Storages/ObjectStorage/Azure/Configuration.cpp | 2 +-
src/Storages/ObjectStorage/Azure/Configuration.h | 2 +-
src/Storages/ObjectStorage/HDFS/Configuration.cpp | 2 +-
src/Storages/ObjectStorage/HDFS/Configuration.h | 2 +-
src/Storages/ObjectStorage/S3/Configuration.cpp | 2 +-
src/Storages/ObjectStorage/S3/Configuration.h | 2 +-
src/Storages/ObjectStorage/StorageObjectStorage.h | 4 ++--
.../ObjectStorage/StorageObjectStorageCluster.cpp | 13 ++++++++++---
.../ObjectStorage/StorageObjectStorageCluster.h | 12 ++++++++++++
9 files changed, 30 insertions(+), 11 deletions(-)
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp
index 1579c4c52a7d..da0c4d475e05 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.cpp
+++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp
@@ -460,7 +460,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageAzureConfiguration::setFunctionArgs(ASTs & args) const
+void StorageAzureConfiguration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
{ /// Just check
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h
index 5ab8c3d71455..a3de3a9377f4 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.h
+++ b/src/Storages/ObjectStorage/Azure/Configuration.h
@@ -79,7 +79,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void setFunctionArgs(ASTs & args) const override;
+ void getTableFunctionArguments(ASTs & args) const override;
protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
index 83e480b09d50..3c043d5ec3bd 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
@@ -236,7 +236,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageHDFSConfiguration::setFunctionArgs(ASTs & args) const
+void StorageHDFSConfiguration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
{ /// Just check
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h
index dbef04aca4aa..75c570901270 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.h
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.h
@@ -65,7 +65,7 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void setFunctionArgs(ASTs & args) const override;
+ void getTableFunctionArguments(ASTs & args) const override;
private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp
index 63b618f3a151..e434fe2e0a0e 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.cpp
+++ b/src/Storages/ObjectStorage/S3/Configuration.cpp
@@ -586,7 +586,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageS3Configuration::setFunctionArgs(ASTs & args) const
+void StorageS3Configuration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
{ /// Just check
diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h
index 46a54f4490e0..9412f6813bf2 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.h
+++ b/src/Storages/ObjectStorage/S3/Configuration.h
@@ -97,7 +97,7 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void setFunctionArgs(ASTs & args) const override;
+ void getTableFunctionArguments(ASTs & args) const override;
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h
index 4ac5fc4aafef..5aad2fb79f00 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorage.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorage.h
@@ -253,9 +253,9 @@ class StorageObjectStorage::Configuration
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
- virtual void setFunctionArgs(ASTs & /* args */) const
+ virtual void getTableFunctionArguments(ASTs & /* args */) const
{
- throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method setFunctionArgs is not supported by storage {}", getEngineName());
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getTableFunctionArguments is not supported by storage {}", getEngineName());
}
protected:
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index a0a787a5e7d1..fe04be5e6fab 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -108,6 +108,13 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
return;
auto * tables = select_query->tables()->as();
+
+ if (tables->children.empty())
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "Expected SELECT query from table with engine {}, got '{}'",
+ configuration->getEngineName(), queryToString(query));
+
auto * table_expression = tables->children[0]->as()->table_expression->as();
if (!table_expression->database_and_table_name)
return;
@@ -116,7 +123,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
auto table_alias = table_identifier_typed.tryGetAlias();
- std::unordered_map engine_to_function = {
+ static std::unordered_map engine_to_function = {
{"S3", "s3"},
{"Azure", "azureBlobStorage"},
{"HDFS", "hdfs"},
@@ -153,7 +160,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
queryToString(query));
}
- configuration->setFunctionArgs(arguments->children);
+ configuration->getTableFunctionArguments(arguments->children);
function_ast->arguments = arguments;
function_ast->children.push_back(arguments);
@@ -163,7 +170,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
table_expression->database_and_table_name = nullptr;
table_expression->table_function = function_ast_ptr;
- table_expression->children[0].swap(function_ast_ptr);
+ table_expression->children[0] = function_ast_ptr;
auto settings = select_query->settings();
if (settings)
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
index 941566a6d12c..89e7f8e827a1 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
@@ -37,6 +37,18 @@ class StorageObjectStorageCluster : public IStorageCluster
const StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context) override;
+ /*
+ In case the table was created with `object_storage_cluster` setting,
+ modify the AST query object so that it uses the table function implementation
+ by mapping the engine name to table function name and setting `object_storage_cluster`.
+ For table like
+ CREATE TABLE table ENGINE=S3(...) SETTINGS object_storage_cluster='cluster'
+ coverts request
+ SELECT * FROM table
+ to
+ SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster'
+ to make distributed request over cluster 'cluster'.
+ */
void updateQueryForDistributedEngineIfNeeded(ASTPtr & query);
const String engine_name;
From 536c4d2729bae4d66e024d528373b68cd584cab4 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 14 Feb 2025 12:58:15 +0100
Subject: [PATCH 22/45] More fixes after review
---
src/Databases/Iceberg/DatabaseIceberg.cpp | 2 --
.../ObjectStorage/Azure/Configuration.cpp | 28 ++-----------------
.../ObjectStorage/Azure/Configuration.h | 3 +-
.../ObjectStorage/HDFS/Configuration.cpp | 2 --
.../ObjectStorage/S3/Configuration.cpp | 2 --
.../StorageObjectStorageCluster.cpp | 4 +++
.../registerStorageObjectStorage.cpp | 3 --
7 files changed, 9 insertions(+), 35 deletions(-)
diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp
index 692a138e46a2..a446764b7666 100644
--- a/src/Databases/Iceberg/DatabaseIceberg.cpp
+++ b/src/Databases/Iceberg/DatabaseIceberg.cpp
@@ -234,8 +234,6 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, storage_settings.get());
auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;
- if (cluster_name.empty())
- cluster_name = context_->getSettingsRef()[Setting::object_storage_cluster].value;
if (cluster_name.empty())
{
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp
index da0c4d475e05..a8e4fe092342 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.cpp
+++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp
@@ -133,8 +133,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll
String connection_url;
String container_name;
- std::optional account_name;
- std::optional account_key;
if (collection.has("connection_string"))
connection_url = collection.get("connection_string");
@@ -155,14 +153,6 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll
compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto"));
blobs_paths = {blob_path};
- if (account_name && account_key)
- {
- if (saved_params.empty())
- {
- saved_params.push_back(*account_name);
- saved_params.push_back(*account_key);
- }
- }
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
}
@@ -186,9 +176,6 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
String container_name = checkAndGetLiteralArgument(engine_args[1], "container");
blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath");
- std::optional account_name;
- std::optional account_key;
-
auto is_format_arg = [] (const std::string & s) -> bool
{
return s == "auto" || FormatFactory::instance().getAllFormats().contains(Poco::toLower(s));
@@ -287,14 +274,6 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}
blobs_paths = {blob_path};
- if (account_name && account_key)
- {
- if (saved_params.empty())
- {
- saved_params.push_back(*account_name);
- saved_params.push_back(*account_key);
- }
- }
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
}
@@ -463,16 +442,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
void StorageAzureConfiguration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
- { /// Just check
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
- }
args.push_back(std::make_shared(connection_params.endpoint.storage_account_url));
args.push_back(std::make_shared(connection_params.endpoint.container_name));
args.push_back(std::make_shared(blob_path));
- for (const auto & arg : saved_params)
+ if (account_name && account_key)
{
- args.push_back(std::make_shared(arg));
+ args.push_back(std::make_shared(*account_name));
+ args.push_back(std::make_shared(*account_key));
}
}
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h
index a3de3a9377f4..7d93acf1701c 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.h
+++ b/src/Storages/ObjectStorage/Azure/Configuration.h
@@ -88,7 +88,8 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
std::string blob_path;
std::vector blobs_paths;
AzureBlobStorage::ConnectionParams connection_params;
- std::vector saved_params;
+ std::optional account_name;
+ std::optional account_key;
};
}
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
index 3c043d5ec3bd..d03d2f8ca0fc 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
@@ -239,9 +239,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
void StorageHDFSConfiguration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
- { /// Just check
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
- }
args.push_back(std::make_shared(url + path));
}
diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp
index e434fe2e0a0e..1487077597a6 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.cpp
+++ b/src/Storages/ObjectStorage/S3/Configuration.cpp
@@ -589,9 +589,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
void StorageS3Configuration::getTableFunctionArguments(ASTs & args) const
{
if (!args.empty())
- { /// Just check
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
- }
args.push_back(std::make_shared(url.uri_str));
if (auth_settings[S3AuthSetting::no_sign_request])
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index fe04be5e6fab..9c0d864cf600 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -116,6 +116,10 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
configuration->getEngineName(), queryToString(query));
auto * table_expression = tables->children[0]->as()->table_expression->as();
+
+ if (!table_expression)
+ return;
+
if (!table_expression->database_and_table_name)
return;
diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
index 2e93f16e09b7..b186f7bf745f 100644
--- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
+++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
@@ -75,9 +75,6 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
- if (cluster_name.empty())
- cluster_name = context->getSettingsRef()[Setting::object_storage_cluster].value;
-
if (cluster_name.empty())
{
return std::make_shared(
From 50fc94fe70e2c4ace359f392905197f6a06b06b1 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 14 Feb 2025 13:41:09 +0100
Subject: [PATCH 23/45] Rename getTableFunctionArguments to
addPathAndAccessKeysToArgs
---
src/Storages/ObjectStorage/Azure/Configuration.cpp | 2 +-
src/Storages/ObjectStorage/Azure/Configuration.h | 2 +-
src/Storages/ObjectStorage/HDFS/Configuration.cpp | 2 +-
src/Storages/ObjectStorage/HDFS/Configuration.h | 2 +-
src/Storages/ObjectStorage/S3/Configuration.cpp | 2 +-
src/Storages/ObjectStorage/S3/Configuration.h | 2 +-
src/Storages/ObjectStorage/StorageObjectStorage.h | 5 +++--
src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp | 2 +-
8 files changed, 10 insertions(+), 9 deletions(-)
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp
index a8e4fe092342..8124950b7c40 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.cpp
+++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp
@@ -439,7 +439,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageAzureConfiguration::getTableFunctionArguments(ASTs & args) const
+void StorageAzureConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const
{
if (!args.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h
index 7d93acf1701c..08b5568baf95 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.h
+++ b/src/Storages/ObjectStorage/Azure/Configuration.h
@@ -79,7 +79,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void getTableFunctionArguments(ASTs & args) const override;
+ void addPathAndAccessKeysToArgs(ASTs & args) const override;
protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
index d03d2f8ca0fc..071aac93a588 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
@@ -236,7 +236,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageHDFSConfiguration::getTableFunctionArguments(ASTs & args) const
+void StorageHDFSConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const
{
if (!args.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h
index 75c570901270..5d009e41d16f 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.h
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.h
@@ -65,7 +65,7 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void getTableFunctionArguments(ASTs & args) const override;
+ void addPathAndAccessKeysToArgs(ASTs & args) const override;
private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp
index 1487077597a6..338182de8fea 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.cpp
+++ b/src/Storages/ObjectStorage/S3/Configuration.cpp
@@ -586,7 +586,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageS3Configuration::getTableFunctionArguments(ASTs & args) const
+void StorageS3Configuration::addPathAndAccessKeysToArgs(ASTs & args) const
{
if (!args.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h
index 9412f6813bf2..7d339dff45f2 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.h
+++ b/src/Storages/ObjectStorage/S3/Configuration.h
@@ -97,7 +97,7 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void getTableFunctionArguments(ASTs & args) const override;
+ void addPathAndAccessKeysToArgs(ASTs & args) const override;
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h
index 5aad2fb79f00..6ee2de0dac50 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorage.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorage.h
@@ -253,9 +253,10 @@ class StorageObjectStorage::Configuration
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
- virtual void getTableFunctionArguments(ASTs & /* args */) const
+ /// Add path and access arguments in the AST arguments durign conversion from table engine to table function
+ virtual void addPathAndAccessKeysToArgs(ASTs & /* args */) const
{
- throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getTableFunctionArguments is not supported by storage {}", getEngineName());
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method addPathAndAccessKeysToArgs is not supported by storage {}", getEngineName());
}
protected:
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index 9c0d864cf600..b54e4ad45cd2 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -164,7 +164,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
queryToString(query));
}
- configuration->getTableFunctionArguments(arguments->children);
+ configuration->addPathAndAccessKeysToArgs(arguments->children);
function_ast->arguments = arguments;
function_ast->children.push_back(arguments);
From 5cb7da7c1611ed8f4d6d781a6d7c61ac92bc1e80 Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Fri, 14 Feb 2025 14:33:01 +0100
Subject: [PATCH 24/45] More refactoring
---
.../ObjectStorage/Azure/Configuration.cpp | 17 ++++++++-------
.../ObjectStorage/Azure/Configuration.h | 2 +-
.../ObjectStorage/HDFS/Configuration.cpp | 9 ++++----
.../ObjectStorage/HDFS/Configuration.h | 2 +-
.../ObjectStorage/S3/Configuration.cpp | 21 ++++++++++---------
src/Storages/ObjectStorage/S3/Configuration.h | 2 +-
.../ObjectStorage/StorageObjectStorage.h | 6 +++---
.../StorageObjectStorageCluster.cpp | 7 ++-----
8 files changed, 32 insertions(+), 34 deletions(-)
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp
index 8124950b7c40..de4f2d953aaa 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.cpp
+++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp
@@ -439,19 +439,20 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageAzureConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const
+ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const
{
- if (!args.empty())
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
+ auto arguments = std::make_shared();
- args.push_back(std::make_shared(connection_params.endpoint.storage_account_url));
- args.push_back(std::make_shared(connection_params.endpoint.container_name));
- args.push_back(std::make_shared(blob_path));
+ arguments->children.push_back(std::make_shared(connection_params.endpoint.storage_account_url));
+ arguments->children.push_back(std::make_shared(connection_params.endpoint.container_name));
+ arguments->children.push_back(std::make_shared(blob_path));
if (account_name && account_key)
{
- args.push_back(std::make_shared(*account_name));
- args.push_back(std::make_shared(*account_key));
+ arguments->children.push_back(std::make_shared(*account_name));
+ arguments->children.push_back(std::make_shared(*account_key));
}
+
+ return arguments;
}
}
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h
index 08b5568baf95..c915696f2448 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.h
+++ b/src/Storages/ObjectStorage/Azure/Configuration.h
@@ -79,7 +79,7 @@ class StorageAzureConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void addPathAndAccessKeysToArgs(ASTs & args) const override;
+ ASTPtr createArgsWithAccessData() const override;
protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
index 071aac93a588..304655dc700c 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp
@@ -236,12 +236,11 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageHDFSConfiguration::addPathAndAccessKeysToArgs(ASTs & args) const
+ASTPtr StorageHDFSConfiguration::createArgsWithAccessData() const
{
- if (!args.empty())
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
-
- args.push_back(std::make_shared(url + path));
+ auto arguments = std::make_shared();
+ arguments->children.push_back(std::make_shared(url + path));
+ return arguments;
}
}
diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h
index 5d009e41d16f..f38382e173ed 100644
--- a/src/Storages/ObjectStorage/HDFS/Configuration.h
+++ b/src/Storages/ObjectStorage/HDFS/Configuration.h
@@ -65,7 +65,7 @@ class StorageHDFSConfiguration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void addPathAndAccessKeysToArgs(ASTs & args) const override;
+ ASTPtr createArgsWithAccessData() const override;
private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp
index 338182de8fea..099a96700f68 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.cpp
+++ b/src/Storages/ObjectStorage/S3/Configuration.cpp
@@ -586,27 +586,28 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
}
-void StorageS3Configuration::addPathAndAccessKeysToArgs(ASTs & args) const
+ASTPtr StorageS3Configuration::createArgsWithAccessData() const
{
- if (!args.empty())
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not empty");
+ auto arguments = std::make_shared();
- args.push_back(std::make_shared(url.uri_str));
+ arguments->children.push_back(std::make_shared(url.uri_str));
if (auth_settings[S3AuthSetting::no_sign_request])
{
- args.push_back(std::make_shared("NOSIGN"));
+ arguments->children.push_back(std::make_shared("NOSIGN"));
}
else
{
- args.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value));
- args.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value));
+ arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::access_key_id].value));
+ arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::secret_access_key].value));
if (!auth_settings[S3AuthSetting::session_token].value.empty())
- args.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value));
+ arguments->children.push_back(std::make_shared(auth_settings[S3AuthSetting::session_token].value));
if (format != "auto")
- args.push_back(std::make_shared(format));
+ arguments->children.push_back(std::make_shared(format));
if (!compression_method.empty())
- args.push_back(std::make_shared(compression_method));
+ arguments->children.push_back(std::make_shared(compression_method));
}
+
+ return arguments;
}
}
diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h
index 7d339dff45f2..c9e9ffd7b8fc 100644
--- a/src/Storages/ObjectStorage/S3/Configuration.h
+++ b/src/Storages/ObjectStorage/S3/Configuration.h
@@ -97,7 +97,7 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
ContextPtr context,
bool with_structure) override;
- void addPathAndAccessKeysToArgs(ASTs & args) const override;
+ ASTPtr createArgsWithAccessData() const override;
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h
index 6ee2de0dac50..2eba0a2cf260 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorage.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorage.h
@@ -253,10 +253,10 @@ class StorageObjectStorage::Configuration
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
- /// Add path and access arguments in the AST arguments durign conversion from table engine to table function
- virtual void addPathAndAccessKeysToArgs(ASTs & /* args */) const
+ /// Create arguments for table function with path and access parameters
+ virtual ASTPtr createArgsWithAccessData() const
{
- throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method addPathAndAccessKeysToArgs is not supported by storage {}", getEngineName());
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method createArgsWithAccessData is not supported by storage {}", getEngineName());
}
protected:
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index b54e4ad45cd2..380deecb283d 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -152,7 +152,6 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
auto function_ast = std::make_shared();
function_ast->name = table_function_name;
- auto arguments = std::make_shared();
auto cluster_name = getClusterName();
@@ -164,10 +163,8 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
queryToString(query));
}
- configuration->addPathAndAccessKeysToArgs(arguments->children);
-
- function_ast->arguments = arguments;
- function_ast->children.push_back(arguments);
+ function_ast->arguments = configuration->createArgsWithAccessData();
+ function_ast->children.push_back(function_ast->arguments);
function_ast->setAlias(table_alias);
ASTPtr function_ast_ptr(function_ast);
From b98865a580ff3830aeec2ca8ad9e75fb4ae08f5e Mon Sep 17 00:00:00 2001
From: Anton Ivashkin
Date: Tue, 18 Feb 2025 11:13:56 +0100
Subject: [PATCH 25/45] Add ability to choose object storage cluster in select
query
---
src/Databases/Iceberg/DatabaseIceberg.cpp | 40 ++----
src/Storages/IStorageCluster.cpp | 36 +++++-
src/Storages/IStorageCluster.h | 21 +++-
.../StorageObjectStorageCluster.cpp | 54 +++++++-
.../StorageObjectStorageCluster.h | 27 +++-
.../registerStorageObjectStorage.cpp | 47 +++----
.../TableFunctionObjectStorageCluster.cpp | 8 +-
.../integration/test_storage_iceberg/test.py | 116 +++++++++---------
8 files changed, 212 insertions(+), 137 deletions(-)
diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp
index a446764b7666..790c35a54efe 100644
--- a/src/Databases/Iceberg/DatabaseIceberg.cpp
+++ b/src/Databases/Iceberg/DatabaseIceberg.cpp
@@ -43,7 +43,6 @@ namespace DatabaseIcebergSetting
namespace Setting
{
extern const SettingsBool allow_experimental_database_iceberg;
- extern const SettingsString object_storage_cluster;
}
namespace ErrorCodes
@@ -235,33 +234,18 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
auto cluster_name = settings[DatabaseIcebergSetting::object_storage_cluster].value;
- if (cluster_name.empty())
- {
- return std::make_shared(
- configuration,
- configuration->createObjectStorage(context_, /* is_readonly */ false),
- context_,
- StorageID(getDatabaseName(), name),
- /* columns */columns,
- /* constraints */ConstraintsDescription{},
- /* comment */"",
- getFormatSettings(context_),
- LoadingStrictnessLevel::CREATE,
- /* distributed_processing */false,
- /* partition_by */nullptr,
- /* lazy_init */true);
- }
- else
- {
- return std::make_shared(
- cluster_name,
- configuration,
- configuration->createObjectStorage(context_, /* is_readonly */ false),
- StorageID(getDatabaseName(), name),
- columns,
- ConstraintsDescription{},
- context_);
- }
+ return std::make_shared(
+ cluster_name,
+ configuration,
+ configuration->createObjectStorage(context_, /* is_readonly */ false),
+ context_,
+ StorageID(getDatabaseName(), name),
+ /* columns */columns,
+ /* constraints */ConstraintsDescription{},
+ /* comment */"",
+ /* format_settings */ getFormatSettings(context_),
+ /* mode */ LoadingStrictnessLevel::CREATE,
+ /* partition_by */nullptr);
}
DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp
index 28b5a84166a2..84885032dda9 100644
--- a/src/Storages/IStorageCluster.cpp
+++ b/src/Storages/IStorageCluster.cpp
@@ -36,6 +36,11 @@ namespace Setting
extern const SettingsBool skip_unavailable_shards;
}
+namespace ErrorCodes
+{
+ extern const int NOT_IMPLEMENTED;
+}
+
IStorageCluster::IStorageCluster(
const String & cluster_name_,
const StorageID & table_id_,
@@ -65,6 +70,19 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
extension = storage->getTaskIteratorExtension(predicate, context);
}
+void IStorageCluster::readFallBackToPure(
+ QueryPlan & /*query_plan*/,
+ const Names & /*column_names*/,
+ const StorageSnapshotPtr & /*storage_snapshot*/,
+ SelectQueryInfo & /*query_info*/,
+ ContextPtr /*context*/,
+ QueryProcessingStage::Enum /*processed_stage*/,
+ size_t /*max_block_size*/,
+ size_t /*num_streams*/)
+{
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName());
+}
+
/// The code executes on initiator
void IStorageCluster::read(
QueryPlan & query_plan,
@@ -73,13 +91,21 @@ void IStorageCluster::read(
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
- size_t /*max_block_size*/,
- size_t /*num_streams*/)
+ size_t max_block_size,
+ size_t num_streams)
{
+ auto cluster_name_ = getClusterName(context);
+
+ if (cluster_name_.empty())
+ {
+ readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
+ return;
+ }
+
storage_snapshot->check(column_names);
updateBeforeRead(context);
- auto cluster = getCluster(context);
+ auto cluster = getClusterImpl(context, cluster_name_);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
@@ -196,9 +222,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
return new_context;
}
-ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
+ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_)
{
- return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
+ return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
}
diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h
index 3bcc467e3135..c6546c20957e 100644
--- a/src/Storages/IStorageCluster.h
+++ b/src/Storages/IStorageCluster.h
@@ -29,10 +29,10 @@ class IStorageCluster : public IStorage
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
- size_t /*max_block_size*/,
- size_t /*num_streams*/) override;
+ size_t max_block_size,
+ size_t num_streams) override;
- ClusterPtr getCluster(ContextPtr context) const;
+ ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
@@ -43,13 +43,26 @@ class IStorageCluster : public IStorage
bool supportsOptimizationToSubcolumns() const override { return false; }
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }
- const String & getClusterName() const { return cluster_name; }
+ const String & getOriginalClusterName() const { return cluster_name; }
+ virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }
protected:
virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
+ virtual void readFallBackToPure(
+ QueryPlan & query_plan,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ size_t num_streams);
+
private:
+ static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_);
+
LoggerPtr log;
String cluster_name;
};
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
index 380deecb283d..431a5fc67bee 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
@@ -23,6 +23,7 @@ namespace DB
namespace Setting
{
extern const SettingsBool use_hive_partitioning;
+ extern const SettingsString object_storage_cluster;
}
namespace ErrorCodes
@@ -63,15 +64,24 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
const String & cluster_name_,
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
+ ContextPtr context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
- ContextPtr context_)
+ const String & comment_,
+ std::optional format_settings_,
+ LoadingStrictnessLevel mode_,
+ ASTPtr partition_by_
+)
: IStorageCluster(
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
, configuration{configuration_}
, object_storage(object_storage_)
, cluster_name_in_settings(false)
+ , comment(comment_)
+ , format_settings(format_settings_)
+ , mode(mode_)
+ , partition_by(partition_by_)
{
ColumnsDescription columns{columns_};
std::string sample_path;
@@ -94,7 +104,7 @@ std::string StorageObjectStorageCluster::getName() const
return configuration->getEngineName();
}
-void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query)
+void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context)
{
// Change table engine on table function for distributed request
// CREATE TABLE t (...) ENGINE=IcebergS3(...)
@@ -131,6 +141,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
{"S3", "s3"},
{"Azure", "azureBlobStorage"},
{"HDFS", "hdfs"},
+ {"Iceberg", "icebergS3"},
{"IcebergS3", "icebergS3"},
{"IcebergAzure", "icebergAzure"},
{"IcebergHDFS", "icebergHDFS"},
@@ -153,7 +164,7 @@ void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr
auto function_ast = std::make_shared();
function_ast->name = table_function_name;
- auto cluster_name = getClusterName();
+ auto cluster_name = getClusterName(context);
if (cluster_name.empty())
{
@@ -195,7 +206,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
const DB::StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context)
{
- updateQueryForDistributedEngineIfNeeded(query);
+ updateQueryForDistributedEngineIfNeeded(query, context);
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
@@ -247,4 +258,39 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
+void StorageObjectStorageCluster::readFallBackToPure(
+ QueryPlan & query_plan,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ size_t num_streams)
+{
+ if (!pure_storage)
+ pure_storage = std::make_shared(
+ configuration,
+ object_storage,
+ context,
+ getStorageID(),
+ getInMemoryMetadata().getColumns(),
+ getInMemoryMetadata().getConstraints(),
+ comment,
+ format_settings,
+ mode,
+ /* distributed_processing */false,
+ partition_by);
+
+ pure_storage->read(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
+}
+
+String StorageObjectStorageCluster::getClusterName(ContextPtr context) const
+{
+ auto cluster_name_ = context->getSettingsRef()[Setting::object_storage_cluster].value;
+ if (cluster_name_.empty())
+ cluster_name_ = getOriginalClusterName();
+ return cluster_name_;
+}
+
}
diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
index 89e7f8e827a1..6f04e4e7d443 100644
--- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
+++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h
@@ -17,10 +17,15 @@ class StorageObjectStorageCluster : public IStorageCluster
const String & cluster_name_,
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
+ ContextPtr context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
- ContextPtr context_);
+ const String & comment_,
+ std::optional format_settings_,
+ LoadingStrictnessLevel mode_,
+ ASTPtr partition_by_ = nullptr
+ );
std::string getName() const override;
@@ -31,12 +36,24 @@ class StorageObjectStorageCluster : public IStorageCluster
void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; }
+ String getClusterName(ContextPtr context) const override;
+
private:
void updateQueryToSendIfNeeded(
ASTPtr & query,
const StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context) override;
+ void readFallBackToPure(
+ QueryPlan & query_plan,
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ size_t num_streams) override;
+
/*
In case the table was created with `object_storage_cluster` setting,
modify the AST query object so that it uses the table function implementation
@@ -49,12 +66,18 @@ class StorageObjectStorageCluster : public IStorageCluster
SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster'
to make distributed request over cluster 'cluster'.
*/
- void updateQueryForDistributedEngineIfNeeded(ASTPtr & query);
+ void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context);
const String engine_name;
const StorageObjectStorage::ConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
bool cluster_name_in_settings;
+
+ std::shared_ptr pure_storage;
+ String comment;
+ std::optional format_settings;
+ LoadingStrictnessLevel mode;
+ ASTPtr partition_by;
};
}
diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
index b186f7bf745f..ec03b47498a9 100644
--- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
+++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp
@@ -20,11 +20,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
-namespace Setting
-{
- extern const SettingsString object_storage_cluster;
-}
-
namespace StorageObjectStorageSetting
{
extern const StorageObjectStorageSettingsString object_storage_cluster;
@@ -75,34 +70,20 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
- if (cluster_name.empty())
- {
- return std::make_shared(
- configuration,
- // We only want to perform write actions (e.g. create a container in Azure) when the table is being created,
- // and we want to avoid it when we load the table after a server restart.
- configuration->createObjectStorage(context, /* is_readonly */ args.mode != LoadingStrictnessLevel::CREATE),
- args.getContext(), /// Use global context.
- args.table_id,
- args.columns,
- args.constraints,
- args.comment,
- format_settings,
- args.mode,
- /* distributed_processing */ false,
- partition_by);
- }
- else
- {
- return std::make_shared