Compare commits

...

2 Commits

Author SHA1 Message Date
boring_nick fc50c042e5 migrate multiple files from a folder 2023-10-28 18:41:01 +03:00
boring_nick 6e08dc2cf9 check for duplicates in the supibot migrator 2023-10-28 18:05:41 +03:00
4 changed files with 160 additions and 107 deletions

View File

@ -24,9 +24,7 @@ pub enum Command {
},
MigrateSupibot {
#[clap(short, long)]
file: PathBuf,
#[clap(short, long)]
channel_id: String,
logs_dir: PathBuf,
#[clap(short, long)]
users_file: PathBuf,
},

View File

@ -77,10 +77,9 @@ async fn main() -> anyhow::Result<()> {
jobs,
}) => migrate(db, source_dir, channel_id, jobs).await,
Some(Command::MigrateSupibot {
file,
channel_id,
logs_dir,
users_file,
}) => migrator::supibot::run(db, &file, &channel_id, &users_file).await,
}) => migrator::supibot::run(config, db, &logs_dir, &users_file).await,
}
}

View File

@ -2,12 +2,14 @@ mod users;
use super::INSERT_BATCH_SIZE;
use crate::{
config::Config,
db::schema::{Message, MESSAGES_TABLE},
error::Error,
migrator::supibot::users::UsersClient,
};
use anyhow::Context;
use chrono::NaiveDateTime;
use clickhouse::inserter::Inserter;
use chrono::{NaiveDate, NaiveDateTime};
use clickhouse::{inserter::Inserter, Row};
use serde::Deserialize;
use std::{
borrow::Cow,
@ -24,43 +26,114 @@ const DATE_FMT: &str = "%F %X%.3f";
const USERS_REQUEST_CHUNK_SIZE: usize = 50;
pub async fn run(
config: Config,
db: clickhouse::Client,
file_path: &Path,
channel_id: &str,
logs_path: &Path,
users_file_path: &Path,
) -> anyhow::Result<()> {
info!("Loading file {file_path:?}");
let inserter = db
.inserter(MESSAGES_TABLE)?
.with_timeouts(
Some(Duration::from_secs(30)),
Some(Duration::from_secs(180)),
)
.with_max_entries(INSERT_BATCH_SIZE)
.with_period(Some(Duration::from_secs(15)));
let mut users_client = UsersClient::default();
users_client
.add_from_file(users_file_path)
.context("Could not read the users file")?;
let channel_user = users_client.get_user_login(channel_id).await?;
let read_dir = std::fs::read_dir(logs_path)?;
let migrator = SupibotMigrator {
users_client,
non_cached_messages: HashMap::with_capacity(USERS_REQUEST_CHUNK_SIZE),
inserter,
channel_id: channel_id.to_owned(),
channel_login: channel_user,
invalid_user_ids: HashSet::new(),
};
for entry in read_dir {
let entry = entry?;
let file_name = entry.file_name();
let file_name = file_name.to_str().expect("File name is invalid UTF-8");
migrator.migrate_channel(file_path).await
if let Some(channel_name) = file_name.strip_suffix(".csv") {
let channel_id = users_client
.get_user_id_by_name(channel_name)
.await?
.context("Could not get channel id")?;
info!("Migrating file {file_name}, channel id {channel_id}");
let inserter = db
.inserter(MESSAGES_TABLE)?
.with_timeouts(
Some(Duration::from_secs(30)),
Some(Duration::from_secs(180)),
)
.with_max_entries(INSERT_BATCH_SIZE)
.with_period(Some(Duration::from_secs(15)));
let existing_dates = get_existing_dates(&db, &channel_id).await?;
let migrator = SupibotMigrator {
non_cached_messages: HashMap::with_capacity(USERS_REQUEST_CHUNK_SIZE),
inserter,
channel_id: channel_id.to_owned(),
channel_login: channel_name.to_owned(),
invalid_user_ids: HashSet::new(),
existing_dates,
imported_count: 0,
skipped_count: 0,
};
info!("Adding channel {channel_id} to config");
config
.channels
.write()
.unwrap()
.insert(channel_id.to_owned());
config.save()?;
match migrator
.migrate_channel(&entry.path(), &mut users_client)
.await
{
Ok((imported_count, skipped_count)) => {
info!("Channel {channel_name} successfully migrated:");
info!("Imported {imported_count} messages");
info!("{skipped_count} messages were skipped due to duplicate dates",);
}
Err(err) => {
error!("Could not migrate channel {channel_name}: {err:#}");
}
}
}
}
Ok(())
}
async fn get_existing_dates(
db: &clickhouse::Client,
channel_id: &str,
) -> Result<HashSet<NaiveDate>, Error> {
info!("Getting existing log dates");
#[derive(Row, Deserialize)]
struct DateRow {
datetime: u32,
}
let raw_dates = db
.query(
"SELECT DISTINCT toStartOfDay(timestamp) as datetime FROM message WHERE channel_id = ?",
)
.bind(channel_id)
.fetch_all::<DateRow>()
.await?;
let dates: HashSet<NaiveDate> = raw_dates
.into_iter()
.map(|row| {
let datetime = NaiveDateTime::from_timestamp_opt(row.datetime as i64, 0)
.expect("Invalid timestamp returned by the db");
datetime.date()
})
.collect();
info!(
"Found {} dates where the channel already has logs",
dates.len()
);
Ok(dates)
}
struct SupibotMigrator {
users_client: UsersClient,
existing_dates: HashSet<NaiveDate>,
/// Messages whose users are not cached
/// Indexed by user id
non_cached_messages: HashMap<String, Vec<SupibotMessage>>,
@ -68,34 +141,32 @@ struct SupibotMigrator {
channel_id: String,
channel_login: String,
invalid_user_ids: HashSet<String>,
imported_count: u64,
skipped_count: u64,
}
impl SupibotMigrator {
async fn migrate_channel(mut self, file_path: &Path) -> anyhow::Result<()> {
async fn migrate_channel(
mut self,
file_path: &Path,
users_client: &mut UsersClient,
) -> anyhow::Result<(u64, u64)> {
let file = File::open(file_path)?;
let reader = BufReader::new(file);
let rdr = csv::Reader::from_reader(reader);
for (i, result) in rdr.into_deserialize::<SupibotMessage>().enumerate() {
if i % 10000 == 0 {
if i % 100_000 == 0 {
info!("Processing message {}", i + 1);
}
let supibot_message = result?;
if supibot_message.historic == 0 {
if let Some(user_login) = self
.users_client
.get_cached_user_login(&supibot_message.platform_id)
if let Some(user_login) =
users_client.get_cached_user_login(&supibot_message.platform_id)
{
write_message(
&supibot_message,
user_login,
&supibot_message.platform_id,
&self.channel_login,
&self.channel_id,
&mut self.inserter,
)
.await?;
self.write_message(&supibot_message, user_login, &supibot_message.platform_id)
.await?;
} else {
self.non_cached_messages
.entry(supibot_message.platform_id.clone())
@ -103,26 +174,18 @@ impl SupibotMigrator {
.push(supibot_message);
if self.non_cached_messages.len() >= USERS_REQUEST_CHUNK_SIZE {
self.flush_non_cached().await?;
self.flush_non_cached(users_client).await?;
}
}
} else {
let user_id = self
.users_client
let user_id = users_client
.get_user_id_by_name(&supibot_message.platform_id)
.await?
// Used when the user id cannot be retrieved
.unwrap_or_default();
write_message(
&supibot_message,
&supibot_message.platform_id,
&user_id,
&self.channel_login,
&self.channel_id,
&mut self.inserter,
)
.await?;
self.write_message(&supibot_message, &supibot_message.platform_id, &user_id)
.await?;
}
let stats = self
@ -137,7 +200,7 @@ impl SupibotMigrator {
);
}
}
self.flush_non_cached().await?;
self.flush_non_cached(users_client).await?;
let stats = self
.inserter
@ -155,26 +218,19 @@ impl SupibotMigrator {
error!("Invalid user ids: {:?}", self.invalid_user_ids);
}
Ok(())
Ok((self.imported_count, self.skipped_count))
}
async fn flush_non_cached(&mut self) -> anyhow::Result<()> {
async fn flush_non_cached(&mut self, users_client: &mut UsersClient) -> anyhow::Result<()> {
let user_ids = self.non_cached_messages.keys().collect::<Vec<_>>();
let users = self.users_client.get_users(&user_ids).await?;
let users = users_client.get_users(&user_ids).await?;
for (user_id, messages) in self.non_cached_messages.drain() {
let non_cached_messages = std::mem::take(&mut self.non_cached_messages);
for (user_id, messages) in non_cached_messages {
match users.get(&user_id) {
Some(user_login) => {
for message in messages {
write_message(
&message,
user_login,
&user_id,
&self.channel_login,
&self.channel_id,
&mut self.inserter,
)
.await?;
self.write_message(&message, user_login, &user_id).await?;
// write_message(message, user, &self.channel_user, &mut self.inserter)
// .await?;
}
@ -187,39 +243,44 @@ impl SupibotMigrator {
Ok(())
}
}
async fn write_message(
supibot_message: &SupibotMessage,
user_login: &str,
user_id: &str,
channel_login: &str,
channel_id: &str,
inserter: &mut Inserter<Message<'_>>,
) -> anyhow::Result<()> {
let text = &supibot_message.text;
async fn write_message(
&mut self,
supibot_message: &SupibotMessage,
user_login: &str,
user_id: &str,
) -> anyhow::Result<()> {
let text = &supibot_message.text;
let datetime = NaiveDateTime::parse_from_str(&supibot_message.posted, DATE_FMT).unwrap();
let timestamp = NaiveDateTime::parse_from_str(&supibot_message.posted, DATE_FMT)
.unwrap()
.timestamp_millis() as u64;
if self.existing_dates.contains(&datetime.date()) {
self.skipped_count += 1;
return Ok(());
}
let raw = format!(
"@id=;returning-chatter=0;turbo=0;mod=0;room-id={channel_id};subscriber=;tmi-sent-ts={timestamp};badge-info=;user-id={user_id};badges=;user-type=;display-name={display_name};flags=;emotes=;first-msg=0;color={color} :{login}!{login}@{login}.tmi.twitch.tv PRIVMSG #{channel_login} :{text}",
display_name = user_login,
user_id = user_id,
login = user_login,
color = "",
);
let timestamp = datetime.timestamp_millis() as u64;
let raw = format!(
"@id=;returning-chatter=0;turbo=0;mod=0;room-id={channel_id};subscriber=;tmi-sent-ts={timestamp};badge-info=;user-id={user_id};badges=;user-type=;display-name={display_name};flags=;emotes=;first-msg=0;color={color} :{login}!{login}@{login}.tmi.twitch.tv PRIVMSG #{channel_login} :{text}",
channel_id = self.channel_id,
channel_login = self.channel_login,
display_name = user_login,
user_id = user_id,
login = user_login,
color = "",
);
let message = Message {
channel_id: Cow::Owned(channel_id.to_owned()),
user_id: Cow::Owned(user_id.to_owned()),
timestamp,
raw: Cow::Owned(raw),
};
let message = Message {
channel_id: Cow::Owned(self.channel_id.to_owned()),
user_id: Cow::Owned(user_id.to_owned()),
timestamp,
raw: Cow::Owned(raw),
};
inserter.write(&message).await?;
Ok(())
self.inserter.write(&message).await?;
self.imported_count += 1;
Ok(())
}
}
#[derive(Deserialize)]

View File

@ -94,14 +94,9 @@ impl UsersClient {
Ok(response_users)
}
pub async fn get_user_login(&mut self, id: &str) -> anyhow::Result<String> {
let users = self.get_users(&[id]).await?;
users.into_values().next().context("Empty ivr response")
}
pub async fn get_user_id_by_name(&mut self, name: &str) -> anyhow::Result<Option<String>> {
match self.names.get(name) {
Some(id) => Ok(id.as_ref().map(|id| self.users.get(id).cloned().unwrap())),
Some(id) => Ok(id.clone()),
None => {
debug!("Fetching info for name {name}");
let response = self
@ -128,7 +123,7 @@ impl UsersClient {
Some(user) => {
self.names.insert(user.login.clone(), Some(user.id.clone()));
self.users.insert(user.id.clone(), user.login.clone());
Ok(Some(user.login))
Ok(Some(user.id))
}
None => {
warn!("User {name} cannot be retrieved");