diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py
index 60b74191ad111..0e73fc783694f 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -312,16 +312,25 @@ def to_timestamp_ltz(*args) -> Expression:
Supported functions:
1. to_timestamp_ltz(Numeric) -> DataTypes.TIMESTAMP_LTZ
+ Converts a numeric value of epoch milliseconds to a TIMESTAMP_LTZ. The default precision is 3.
2. to_timestamp_ltz(Numeric, Integer) -> DataTypes.TIMESTAMP_LTZ
+ Converts a numeric value of epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ.
+ Valid precisions are 0 or 3.
3. to_timestamp_ltz(String) -> DataTypes.TIMESTAMP_LTZ
+ Converts a timestamp string using default format 'yyyy-MM-dd HH:mm:ss.SSS' to a TIMESTAMP_LTZ.
4. to_timestamp_ltz(String, String) -> DataTypes.TIMESTAMP_LTZ
+ Converts a timestamp string using format (default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ.
5. to_timestamp_ltz(String, String, String) -> DataTypes.TIMESTAMP_LTZ
+ Converts a timestamp string string1 using format string2 (default 'yyyy-MM-dd HH:mm:ss.SSS')
+ in time zone string3 (default 'UTC') to a TIMESTAMP_LTZ.
+ Supports any timezone that is available in Java's TimeZone database.
Example:
::
>>> table.select(to_timestamp_ltz(100)) # numeric with default precision
>>> table.select(to_timestamp_ltz(100, 0)) # numeric with second precision
+ >>> table.select(to_timestamp_ltz(100, 3)) # numeric with millisecond precision
>>> table.select(to_timestamp_ltz("2023-01-01 00:00:00")) # string with default format
>>> table.select(to_timestamp_ltz("01/01/2023", "MM/dd/yyyy")) # string with format
>>> table.select(to_timestamp_ltz("2023-01-01 00:00:00",
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 4b34d06c4d32c..84365f5f5edb6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -381,7 +381,7 @@ public static ApiExpression toTimestampLtz(String timestampStr, String format) {
/**
* Converts a timestamp to {@link DataTypes#TIMESTAMP_LTZ(int)}.
*
- *
This method takes an object representing a timestamp and converts it to a TIMESTAMP_LTZ
+ *
This method takes a string representing a timestamp and converts it to a TIMESTAMP_LTZ
* using the built-in TO_TIMESTAMP_LTZ function definition.
*
* @param timeStamp The timestamp string to be converted.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
index 5a80d30f42edf..70483b3d830a2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
@@ -57,7 +57,7 @@ public Optional inferType(CallContext callContext) {
if (!isCharacterType(firstTypeRoot) && !firstType.is(LogicalTypeFamily.NUMERIC)) {
throw new ValidationException(
"Unsupported argument type. "
- + "When taking 1 argument, TO_TIMESTAMP_LTZ accepts or .");
+ + "When taking 1 argument, TO_TIMESTAMP_LTZ accepts an argument of type , , or .");
}
} else if (argCount == 2) {
LogicalType secondType = argumentTypes.get(1).getLogicalType();
@@ -72,12 +72,12 @@ public Optional inferType(CallContext callContext) {
if (!isCharacterType(secondTypeRoot)) {
throw new ValidationException(
"Unsupported argument type. "
- + "TO_TIMESTAMP_LTZ(, ) requires the second argument to be .");
+ + "If the first argument is of type or , TO_TIMESTAMP_LTZ requires the second argument to be of type or .");
}
} else {
throw new ValidationException(
"Unsupported argument type. "
- + "When taking 2 arguments, TO_TIMESTAMP_LTZ requires the first argument to be or .");
+ + "When taking 2 arguments, TO_TIMESTAMP_LTZ requires the first argument to be of type , , or .");
}
} else if (argCount == 3) {
if (!isCharacterType(firstTypeRoot)
@@ -85,7 +85,7 @@ public Optional inferType(CallContext callContext) {
|| !isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) {
throw new ValidationException(
"Unsupported argument type. "
- + "When taking 3 arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type .");
+ + "When taking 3 arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type or .");
}
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
index 2f0591365497d..fab17fde085ba 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
@@ -44,7 +44,7 @@ protected Stream testData() {
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
.inputTypes(DataTypes.BOOLEAN())
.expectErrorMessage(
- "Unsupported argument type. When taking 1 argument, TO_TIMESTAMP_LTZ accepts or ."),
+ "Unsupported argument type. When taking 1 argument, TO_TIMESTAMP_LTZ accepts an argument of type , , or ."),
TestSpec.forStrategy(
"TO_TIMESTAMP_LTZ(, )",
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
@@ -66,13 +66,13 @@ protected Stream testData() {
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
.inputTypes(DataTypes.STRING(), DataTypes.FLOAT())
.expectErrorMessage(
- "Unsupported argument type. TO_TIMESTAMP_LTZ(, ) requires the second argument to be ."),
+ "Unsupported argument type. If the first argument is of type or , TO_TIMESTAMP_LTZ requires the second argument to be of type or ."),
TestSpec.forStrategy(
"Invalid first argument when taking 2 arguments",
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
.inputTypes(DataTypes.BOOLEAN(), DataTypes.FLOAT())
.expectErrorMessage(
- "Unsupported argument type. When taking 2 arguments, TO_TIMESTAMP_LTZ requires the first argument to be or ."),
+ "Unsupported argument type. When taking 2 arguments, TO_TIMESTAMP_LTZ requires the first argument to be of type , , or ."),
TestSpec.forStrategy(
"TO_TIMESTAMP_LTZ(, , )",
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
@@ -82,7 +82,7 @@ protected Stream testData() {
"Invalid three arguments", SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
.inputTypes(DataTypes.STRING(), DataTypes.INT(), DataTypes.STRING())
.expectErrorMessage(
- "Unsupported argument type. When taking 3 arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type ."),
+ "Unsupported argument type. When taking 3 arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type or ."),
TestSpec.forStrategy("No arguments", SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
.inputTypes()
.expectErrorMessage(