diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex index 7d43edd..22d1f99 100644 --- a/lib/ch/row_binary.ex +++ b/lib/ch/row_binary.ex @@ -426,7 +426,9 @@ defmodule Ch.RowBinary do _ when is_integer(value) -> [0x0A | encode(:i64, value)] _ when is_float(value) -> [0x0E | encode(:f64, value)] %Date{} -> [0x0F | encode(:date, value)] + %DateTime{} -> [0x11 | encode(:datetime, value)] %NaiveDateTime{} -> [0x11 | encode(:datetime, value)] + %{} -> [0x30, 0x00, 0x80, 0x08, 0x20, 0x00, 0x00, 0x00 | encode(:json, value)] [] -> [0x1E, 0x00] end end @@ -1136,6 +1138,74 @@ defmodule Ch.RowBinary do decode_dynamic_continue(rest, [:low_cardinality | dynamic], types_rest, row, rows, types) end + # JSON(max_dynamic_paths=N, max_dynamic_types=M, path Type, SKIP skip_path, SKIP REGEXP skip_path_regexp) 0x30......... + defp decode_dynamic(<<0x30, rest::bytes>>, dynamic, types_rest, row, rows, types) do + # Assert uint8_serialization_version to be 0 + <<0x00, rest::bytes>> = rest + + # Skip var_int_max_dynamic_paths + {_paths, rest} = read_varint(rest) + + # Skip uint8_max_dynamic_types + <<_val, rest::bytes>> = rest + + # Read var_uint_number_of_typed_paths + {typed_paths, rest} = read_varint(rest) + + # Skip `typed_paths` typed paths + rest = + Enum.reduce(1..typed_paths//1, rest, fn _, rest -> + {count, rest} = read_varint(rest) + <<_discard::size(count)-bytes, rest::bytes>> = rest + skip_type(rest) + end) + + # Read var_uint_number_of_skip_paths + {skip_paths, rest} = read_varint(rest) + + # Skip `skip_paths` skipped paths + rest = + Enum.reduce(1..skip_paths//1, rest, fn _, rest -> + {count, rest} = read_varint(rest) + <<_discard::size(count)-bytes, rest::bytes>> = rest + rest + end) + + # Read var_uint_number_of_skip_path_regexps + {skip_path_regexes, rest} = read_varint(rest) + + # Skip `skip_path_regexes` skipped paths regex + rest = + Enum.reduce(1..skip_path_regexes//1, rest, fn _, rest -> + {count, rest} = read_varint(rest) + <<_discard::size(count)-bytes, rest::bytes>> = rest + rest + end) + + decode_dynamic_continue(rest, [:json | dynamic], types_rest, row, rows, types) + end + + for {pattern, value} <- varints do + defp read_varint(<>), do: {unquote(value), rest} + end + + other_dynamic_types = [ + datetime: 0x11, + set: 0x21, + bfloat16: 0x31, + time: 0x32 + ] + + # Consume a type header from binary input, returning the rest. + # TODO: Only supports single-byte type headers for now. + def skip_type(<>) + when type in unquote(Keyword.values(dynamic_types ++ other_dynamic_types)), do: rest + + def skip_type(<>) do + raise ArgumentError, + "Unsupported type definition (starting with 0x#{Base.encode16(<>)}) while decoding dynamic JSON. Only single-byte type identifiers are currently supported." + end + # TODO # Enum8 0x17 ... # Enum16 0x18 ...> @@ -1151,7 +1221,6 @@ defmodule Ch.RowBinary do # Custom type (Ring, Polygon, etc) 0x2C # SimpleAggregateFunction(function_name(param_1, ..., param_N), arg_T1, ..., arg_TN) 0x2E...... (see aggregate function parameter binary encoding) # Nested(name1 T1, ..., nameN TN) 0x2F... - # JSON(max_dynamic_paths=N, max_dynamic_types=M, path Type, SKIP skip_path, SKIP REGEXP skip_path_regexp) 0x30......... unsupported_dynamic_types = %{ "Enum8" => 0x17, @@ -1167,8 +1236,7 @@ defmodule Ch.RowBinary do "Dynamic" => 0x2B, "CustomType" => 0x2C, "SimpleAggregateFunction" => 0x2E, - "Nested" => 0x2F, - "JSON" => 0x30 + "Nested" => 0x2F } for {type, code} <- unsupported_dynamic_types do diff --git a/test/ch/json_test.exs b/test/ch/json_test.exs index b41a82d..57b1841 100644 --- a/test/ch/json_test.exs +++ b/test/ch/json_test.exs @@ -345,18 +345,76 @@ defmodule Ch.JSONTest do ] ] - # TODO - assert_raise ArgumentError, "unsupported dynamic type JSON", fn -> - Ch.query!(conn, "SELECT json.a.b, dynamicType(json.a.b) FROM json_test;", [], query_options) - end + assert Ch.query!(conn, "SELECT json.a.b FROM json_test;", [], query_options).rows == [ + [ + [ + %{"c" => 42, "d" => "Hello", "f" => [[%{"g" => 42.42}]], "k" => %{"j" => 1000}}, + %{"c" => 43}, + %{ + "d" => "My", + "e" => [1, 2, 3], + "f" => [[%{"g" => 43.43, "h" => "2020-01-01"}]], + "k" => %{"j" => 2000} + } + ] + ], + [[1, 2, 3]], + [ + [ + %{"c" => 44, "f" => [[%{"h" => "2020-01-02"}]]}, + %{ + "d" => "World", + "e" => [4, 5, 6], + "f" => [[%{"g" => 44.44}]], + "k" => %{"j" => 3000} + } + ] + ] + ] - assert_raise ArgumentError, "unsupported dynamic type JSON", fn -> - Ch.query!( - conn, - "SELECT json.a.b.:`Array(JSON)`.c, json.a.b.:`Array(JSON)`.f, json.a.b.:`Array(JSON)`.d FROM json_test;", - [], - query_options - ) - end + assert Ch.query!( + conn, + "SELECT json.a.b[].c, json.a.b[].f, json.a.b[].d FROM json_test;", + [], + query_options + ).rows == [ + [ + [42, 43, nil], + [[[%{"g" => 42.42}]], nil, [[%{"g" => 43.43, "h" => "2020-01-01"}]]], + ["Hello", nil, "My"] + ], + [[], [], []], + [[44, nil], [[[%{"h" => "2020-01-02"}]], [[%{"g" => 44.44}]]], [nil, "World"]] + ] + + query_options = Keyword.put(query_options, :enable_time_time64_type, 1) + + assert_raise ArgumentError, + "Unsupported type definition (starting with 0x34) while decoding dynamic JSON. Only single-byte type identifiers are currently supported.", + fn -> + Ch.query!( + conn, + ~s|SELECT '{"a": "10:00:00.050"}'::JSON(a Time64)::Dynamic;|, + [], + query_options + ) + end + end + + test "encode JSON in dynamic column", %{conn: conn, query_options: query_options} do + Ch.query!(conn, "CREATE TABLE json_test (value Dynamic) ENGINE = Memory;", [], query_options) + + query_options = Keyword.put(query_options, :types, [:dynamic]) + + Ch.query!( + conn, + "INSERT INTO json_test (value) FORMAT RowBinary", + [[%{"json_obj" => 42}]], + query_options + ) + + assert Ch.query!(conn, "SELECT value FROM json_test").rows == [ + [%{"json_obj" => 42}] + ] end end