Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error parsing on invalid DDL #93

Open
alexanderspevak opened this issue Nov 3, 2023 · 1 comment
Open

Better error parsing on invalid DDL #93

alexanderspevak opened this issue Nov 3, 2023 · 1 comment
Labels
bug Something isn't working

Comments

@alexanderspevak
Copy link

alexanderspevak commented Nov 3, 2023

Hello, I am getting following error:
Screenshot 2023-11-03 at 1 16 49
I am using version 0.11.6 . I do not see discrepancies between struct schema and migration. I have limited size of the batch to be inserted to 5000 rows and still I am having issues.

use capnp::message::ReaderOptions;
use capnp::serialize;
use clickhouse::{error::Result, sql, Client, Row};
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use serde::{Deserialize, Serialize};
use std::error::Error;

use std::{
    ops::Add,
    time::{Duration, SystemTime},
};

pub mod http_log_capnp;

#[derive(Debug, Serialize, Deserialize, Row)]
struct LogRow {
    timestamp: u64,
    resource_id: u64,
    bytes_sent: u64,
    request_time_milli: u64,
    response_status: u16,
    cache_status: String, //LowCardinality
    method: String,       //LowCardinality
    remote_addr: String,
    url: String,
}

impl LogRow {
    fn new(reader: http_log_capnp::http_log_record::Reader) -> Result<LogRow, capnp::Error> {
        let timestamp = reader.get_timestamp_epoch_milli() / 1000;
        let resource_id = reader.get_resource_id();
        let bytes_sent = reader.get_bytes_sent();
        let request_time_milli = reader.get_request_time_milli();
        let response_status = reader.get_response_status();
        let cache_status = reader.get_cache_status()?.to_owned();
        let method = reader.get_method()?.to_owned();
        let remote_addr = reader.get_remote_addr()?.to_owned();
        let url = reader.get_url()?.to_owned();

        return Ok(LogRow {
            timestamp,
            resource_id,
            bytes_sent,
            request_time_milli,
            response_status,
            cache_status,
            method,
            remote_addr,
            url,
        });
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut consumer = Consumer::from_hosts(vec!["localhost:9092".to_owned()])
        .with_topic("http_log".to_string())
        .with_fallback_offset(FetchOffset::Earliest)
        .with_group("http_log".to_string())
        .with_offset_storage(Some(GroupOffsetStorage::Kafka))
        .create()
        .unwrap();

    let ddl = r"
            CREATE TABLE IF NOT EXISTS http_log (
                timestamp UInt64,
                resource_id UInt64,
                bytes_sent UInt64, 
                request_time_milli UInt64,
                response_status UInt16,
                cache_status String,
                method String,
                remote_addr String,
                url String
            )
            ENGINE = MergeTree";

    let client = Client::default().with_url("http://localhost:8124");

    client.query(ddl).execute().await?;
    let mut storage = vec![];
    let start = SystemTime::now();
    while start.elapsed().unwrap().as_secs() < 67 {
        for ms in consumer.poll().unwrap().iter() {
            for m in ms.messages() {
                let reader = serialize::read_message(m.value, ReaderOptions::new()).unwrap();

                let reader = reader
                    .get_root::<http_log_capnp::http_log_record::Reader>()
                    .unwrap();
                let row = LogRow::new(reader).unwrap();
                storage.push(row)
            }
        }
        consumer.commit_consumed().unwrap();
    }

    let mut inserter = client.inserter("http_log")?.with_max_entries(5000);

    for (idx, row) in storage.iter().enumerate() {
        if (idx > 4998) {
            break;
        }

        inserter.write(row).await?;
        inserter.commit().await?;
    }
    inserter.end().await?;

    Ok(())
}
@alexanderspevak alexanderspevak changed the title Cannot read al data error. Cannot read all data error. Nov 3, 2023
@alexanderspevak
Copy link
Author

Fixed, Engine=MergeTree needs ORDER BY clause

@loyd loyd changed the title Cannot read all data error. Better error parsing on invalid DDL Nov 6, 2023
@slvrtrn slvrtrn closed this as completed Jul 29, 2024
@slvrtrn slvrtrn reopened this Jul 29, 2024
@slvrtrn slvrtrn added the bug Something isn't working label Jul 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants