Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support query distributed table with JSON column #956

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions lib/column/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ type JSONObject struct {
name string
root bool
encoding uint8
decoding Interface
tz *time.Location
}

Expand Down Expand Up @@ -846,14 +847,18 @@ func (jCol *JSONObject) Rows() int {
return 0
}

// ClickHouse returns JSON as a tuple i.e. these will never be invoked

func (jCol *JSONObject) Row(i int, ptr bool) interface{} {
panic("Not implemented")
if jCol.decoding != nil {
return jCol.decoding.Row(i, ptr)
}
panic("bug")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is exactly the case here? Decode func wasn't called prior?
let's make it more describtive

}

func (jCol *JSONObject) ScanRow(dest interface{}, row int) error {
panic("Not implemented")
if jCol.decoding != nil {
return jCol.decoding.ScanRow(dest, row)
}
panic("bug")
}

func (jCol *JSONObject) Append(v interface{}) (nulls []uint8, err error) {
Expand Down Expand Up @@ -908,7 +913,15 @@ func (jCol *JSONObject) AppendRow(v interface{}) error {
}

func (jCol *JSONObject) Decode(reader *proto.Reader, rows int) error {
panic("Not implemented")
columnType, err := reader.Str()
if err != nil {
return err
}
jCol.decoding, err = Type(columnType).Column("", jCol.tz)
if err != nil {
return err
}
return jCol.decoding.Decode(reader, rows)
}

func (jCol *JSONObject) Encode(buffer *proto.Buffer) {
Expand Down
75 changes: 72 additions & 3 deletions tests/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2456,13 +2456,13 @@ func TestMultipleJsonRowsWithNil(t *testing.T) {
for k := range myMap {
newMap[k] = myMap[k]
}

return newMap
}

type Login struct {
Username string `json:"username"`
Attachment map[string]interface{}
Username string `json:"username"`
Attachment map[string]interface{}
}

myAttachment := map[string]interface{}{
Expand All @@ -2483,3 +2483,72 @@ func TestMultipleJsonRowsWithNil(t *testing.T) {

require.NoError(t, batch.Send())
}

func TestComplexJSONWithDistributed(t *testing.T) {
conn, teardown := setupTest(t)
defer teardown(t)

ctx := context.Background()
cluster := ""
require.NoError(t, conn.QueryRow(ctx, "SELECT `cluster` FROM system.clusters LIMIT 1;").Scan(&cluster))
conn.Exec(ctx, "DROP TABLE IF EXISTS json_test_distributed")
ddl := fmt.Sprintf(`CREATE table json_test_distributed(event JSON) ENGINE = Distributed('%s', currentDatabase(), 'json_test', rand());`, cluster)
require.NoError(t, conn.Exec(ctx, ddl))
defer func() {
require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS json_test_distributed"))
}()

batch := prepareBatch(t, conn, ctx)
row1 := GithubEvent{
Title: "Document JSON support",
Type: "Issue",
Assignee: Account{
Id: 1244,
Name: "Geoff",
Achievement: Achievement{Name: "Mars Star", AwardedDate: testDate.Truncate(time.Second)},
Repositories: []Repository{{URL: "https://github.com/ClickHouse/clickhouse-python", Releases: []Releases{{Version: "1.0.0"}, {Version: "1.1.0"}}}, {URL: "https://github.com/ClickHouse/clickhouse-go", Releases: []Releases{{Version: "2.0.0"}, {Version: "2.1.0"}}}},
Organizations: []string{"Support Engineer", "Integrations"},
},
Labels: []string{"Help wanted"},
Contributors: []Account{
{Id: 2244, Achievement: Achievement{Name: "Adding JSON to go driver", AwardedDate: testDate.Truncate(time.Second).Add(time.Hour * -500)}, Organizations: []string{"Support Engineer", "Consulting", "PM", "Integrations"}, Name: "Dale", Repositories: []Repository{{URL: "https://github.com/ClickHouse/clickhouse-go", Releases: []Releases{{Version: "2.0.0"}, {Version: "2.1.0"}}}, {URL: "https://github.com/grafana/clickhouse", Releases: []Releases{{Version: "1.2.0"}, {Version: "1.3.0"}}}}},
{Id: 2344, Achievement: Achievement{Name: "Managing S3 buckets", AwardedDate: testDate.Truncate(time.Second).Add(time.Hour * -700)}, Organizations: []string{"Support Engineer", "Consulting"}, Name: "Melyvn", Repositories: []Repository{{URL: "https://github.com/ClickHouse/support", Releases: []Releases{{Version: "1.0.0"}, {Version: "2.3.0"}, {Version: "2.4.0"}}}}},
},
}
row2 := GithubEvent{
Title: "Document JSON support 2",
Type: "Issue",
Assignee: Account{
Id: 1245,
Name: "Geoff",
Achievement: Achievement{Name: "Mars Star", AwardedDate: testDate.Truncate(time.Second)},
Repositories: []Repository{{URL: "https://github.com/ClickHouse/clickhouse-python", Releases: []Releases{{Version: "1.0.0"}, {Version: "1.1.0"}}}, {URL: "https://github.com/ClickHouse/clickhouse-go", Releases: []Releases{{Version: "2.0.0"}, {Version: "2.1.0"}}}},
Organizations: []string{"Support Engineer", "Integrations"},
},
Labels: []string{"Help wanted"},
Contributors: []Account{
{Id: 2244, Achievement: Achievement{Name: "Adding JSON to go driver", AwardedDate: testDate.Truncate(time.Second).Add(time.Hour * -500)}, Organizations: []string{"Support Engineer", "Consulting", "PM", "Integrations"}, Name: "Dale", Repositories: []Repository{{URL: "https://github.com/ClickHouse/clickhouse-go", Releases: []Releases{{Version: "2.0.0"}, {Version: "2.1.0"}}}, {URL: "https://github.com/grafana/clickhouse", Releases: []Releases{{Version: "1.2.0"}, {Version: "1.3.0"}}}}},
{Id: 2344, Achievement: Achievement{Name: "Managing S3 buckets", AwardedDate: testDate.Truncate(time.Second).Add(time.Hour * -700)}, Organizations: []string{"Support Engineer", "Consulting"}, Name: "Melyvn", Repositories: []Repository{{URL: "https://github.com/ClickHouse/support", Releases: []Releases{{Version: "1.0.0"}, {Version: "2.3.0"}, {Version: "2.4.0"}}}}},
},
}
require.NoError(t, batch.Append(row1, row2))
require.NoError(t, batch.Send())

var (
event1 GithubEvent
event2 GithubEvent
n int
)
rows, err := conn.Query(ctx, "SELECT * FROM json_test_distributed ORDER BY assignee.id")
require.NoError(t, err)
for rows.Next() {
if n == 0 {
require.NoError(t, rows.Scan(event1))
} else {
require.NoError(t, rows.Scan(event2))
}
n++
}
assert.JSONEq(t, toJson(row1), toJson(event1))
assert.JSONEq(t, toJson(row2), toJson(event2))
}