Compare commits

...

11 Commits

Author SHA1 Message Date
boring_nick fc50c042e5 migrate multiple files from a folder 2023-10-28 18:41:01 +03:00
boring_nick 6e08dc2cf9 check for duplicates in the supibot migrator 2023-10-28 18:05:41 +03:00
boring_nick eb2b5c876c load users from a file 2023-10-27 11:35:11 +03:00
boring_nick 9aef7121fc Merge branch 'master' into supibot 2023-10-26 19:46:23 +03:00
boring_nick 822f32ae5d Merge branch 'master' into supibot 2023-07-07 10:33:00 +03:00
boring_nick 25d290b640 Merge branch 'master' into supibot 2023-06-27 17:32:16 +03:00
boring_nick f41b4cd8cb handle users which cannot be retrieved 2023-06-24 14:08:41 +03:00
boring_nick e888fb1933 handle historic messages 2023-06-24 13:10:37 +03:00
boring_nick 0c2ca8ded4 better irc forming 2023-06-24 09:52:08 +03:00
boring_nick 1c5ff2f7a5 better ivr batching 2023-06-24 09:23:40 +03:00
boring_nick 935842bafa wip supibot migration 2023-06-24 08:56:06 +03:00
7 changed files with 512 additions and 0 deletions

32
Cargo.lock generated
View File

@ -562,6 +562,27 @@ dependencies = [
"typenum",
]
[[package]]
name = "csv"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70"
dependencies = [
"memchr",
]
[[package]]
name = "dashmap"
version = "5.5.3"
@ -1027,6 +1048,15 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.9"
@ -1699,10 +1729,12 @@ dependencies = [
"chrono",
"clap",
"clickhouse",
"csv",
"dashmap",
"flate2",
"futures",
"indexmap 1.9.3",
"itertools",
"lazy_static",
"metrics-prometheus",
"mimalloc",

View File

@ -24,6 +24,7 @@ rand = "0.8.5"
rayon = "1.7.0"
reqwest = { version = "0.11.20", features = [
"rustls-tls",
"json",
], default-features = false }
rust-embed = { version = "8.0.0", features = ["interpolate-folder-path"] }
schemars = "0.8.13"
@ -54,6 +55,8 @@ twitch_api2 = { version = "0.6.1", features = [
twitch = { git = "https://github.com/jprochazk/twitch-rs", features = ["simd"] }
axum-prometheus = "0.4.0"
metrics-prometheus = "0.4.1"
csv = "1.2.2"
itertools = "0.11.0"
async-trait = "0.1.73"
[dev-dependencies]

View File

@ -1,4 +1,5 @@
use clap::{Parser, Subcommand};
use std::path::PathBuf;
#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
@ -21,4 +22,10 @@ pub enum Command {
#[clap(short, long, default_value_t = 1)]
jobs: usize,
},
MigrateSupibot {
#[clap(short, long)]
logs_dir: PathBuf,
#[clap(short, long)]
users_file: PathBuf,
},
}

View File

@ -76,6 +76,10 @@ async fn main() -> anyhow::Result<()> {
channel_id,
jobs,
}) => migrate(db, source_dir, channel_id, jobs).await,
Some(Command::MigrateSupibot {
logs_dir,
users_file,
}) => migrator::supibot::run(config, db, &logs_dir, &users_file).await,
}
}

View File

@ -1,4 +1,5 @@
mod reader;
pub mod supibot;
use self::reader::{LogsReader, COMPRESSED_CHANNEL_FILE, UNCOMPRESSED_CHANNEL_FILE};
use crate::{

315
src/migrator/supibot/mod.rs Normal file
View File

@ -0,0 +1,315 @@
mod users;
use super::INSERT_BATCH_SIZE;
use crate::{
config::Config,
db::schema::{Message, MESSAGES_TABLE},
error::Error,
migrator::supibot::users::UsersClient,
};
use anyhow::Context;
use chrono::{NaiveDate, NaiveDateTime};
use clickhouse::{inserter::Inserter, Row};
use serde::Deserialize;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fs::File,
io::BufReader,
path::Path,
time::Duration,
};
use tracing::{error, info};
// Exmaple: 2023-06-23 14:46:26.588
const DATE_FMT: &str = "%F %X%.3f";
const USERS_REQUEST_CHUNK_SIZE: usize = 50;
pub async fn run(
config: Config,
db: clickhouse::Client,
logs_path: &Path,
users_file_path: &Path,
) -> anyhow::Result<()> {
let mut users_client = UsersClient::default();
users_client
.add_from_file(users_file_path)
.context("Could not read the users file")?;
let read_dir = std::fs::read_dir(logs_path)?;
for entry in read_dir {
let entry = entry?;
let file_name = entry.file_name();
let file_name = file_name.to_str().expect("File name is invalid UTF-8");
if let Some(channel_name) = file_name.strip_suffix(".csv") {
let channel_id = users_client
.get_user_id_by_name(channel_name)
.await?
.context("Could not get channel id")?;
info!("Migrating file {file_name}, channel id {channel_id}");
let inserter = db
.inserter(MESSAGES_TABLE)?
.with_timeouts(
Some(Duration::from_secs(30)),
Some(Duration::from_secs(180)),
)
.with_max_entries(INSERT_BATCH_SIZE)
.with_period(Some(Duration::from_secs(15)));
let existing_dates = get_existing_dates(&db, &channel_id).await?;
let migrator = SupibotMigrator {
non_cached_messages: HashMap::with_capacity(USERS_REQUEST_CHUNK_SIZE),
inserter,
channel_id: channel_id.to_owned(),
channel_login: channel_name.to_owned(),
invalid_user_ids: HashSet::new(),
existing_dates,
imported_count: 0,
skipped_count: 0,
};
info!("Adding channel {channel_id} to config");
config
.channels
.write()
.unwrap()
.insert(channel_id.to_owned());
config.save()?;
match migrator
.migrate_channel(&entry.path(), &mut users_client)
.await
{
Ok((imported_count, skipped_count)) => {
info!("Channel {channel_name} successfully migrated:");
info!("Imported {imported_count} messages");
info!("{skipped_count} messages were skipped due to duplicate dates",);
}
Err(err) => {
error!("Could not migrate channel {channel_name}: {err:#}");
}
}
}
}
Ok(())
}
async fn get_existing_dates(
db: &clickhouse::Client,
channel_id: &str,
) -> Result<HashSet<NaiveDate>, Error> {
info!("Getting existing log dates");
#[derive(Row, Deserialize)]
struct DateRow {
datetime: u32,
}
let raw_dates = db
.query(
"SELECT DISTINCT toStartOfDay(timestamp) as datetime FROM message WHERE channel_id = ?",
)
.bind(channel_id)
.fetch_all::<DateRow>()
.await?;
let dates: HashSet<NaiveDate> = raw_dates
.into_iter()
.map(|row| {
let datetime = NaiveDateTime::from_timestamp_opt(row.datetime as i64, 0)
.expect("Invalid timestamp returned by the db");
datetime.date()
})
.collect();
info!(
"Found {} dates where the channel already has logs",
dates.len()
);
Ok(dates)
}
struct SupibotMigrator {
existing_dates: HashSet<NaiveDate>,
/// Messages whose users are not cached
/// Indexed by user id
non_cached_messages: HashMap<String, Vec<SupibotMessage>>,
inserter: Inserter<Message<'static>>,
channel_id: String,
channel_login: String,
invalid_user_ids: HashSet<String>,
imported_count: u64,
skipped_count: u64,
}
impl SupibotMigrator {
async fn migrate_channel(
mut self,
file_path: &Path,
users_client: &mut UsersClient,
) -> anyhow::Result<(u64, u64)> {
let file = File::open(file_path)?;
let reader = BufReader::new(file);
let rdr = csv::Reader::from_reader(reader);
for (i, result) in rdr.into_deserialize::<SupibotMessage>().enumerate() {
if i % 100_000 == 0 {
info!("Processing message {}", i + 1);
}
let supibot_message = result?;
if supibot_message.historic == 0 {
if let Some(user_login) =
users_client.get_cached_user_login(&supibot_message.platform_id)
{
self.write_message(&supibot_message, user_login, &supibot_message.platform_id)
.await?;
} else {
self.non_cached_messages
.entry(supibot_message.platform_id.clone())
.or_default()
.push(supibot_message);
if self.non_cached_messages.len() >= USERS_REQUEST_CHUNK_SIZE {
self.flush_non_cached(users_client).await?;
}
}
} else {
let user_id = users_client
.get_user_id_by_name(&supibot_message.platform_id)
.await?
// Used when the user id cannot be retrieved
.unwrap_or_default();
self.write_message(&supibot_message, &supibot_message.platform_id, &user_id)
.await?;
}
let stats = self
.inserter
.commit()
.await
.context("Could not flush messages")?;
if stats.entries > 0 {
info!(
"DB: {} entries ({} transactions) have been inserted",
stats.entries, stats.transactions,
);
}
}
self.flush_non_cached(users_client).await?;
let stats = self
.inserter
.end()
.await
.context("Could not flush messages")?;
if stats.entries > 0 {
info!(
"DB: {} entries ({} transactions) have been inserted",
stats.entries, stats.transactions,
);
}
if !self.invalid_user_ids.is_empty() {
error!("Invalid user ids: {:?}", self.invalid_user_ids);
}
Ok((self.imported_count, self.skipped_count))
}
async fn flush_non_cached(&mut self, users_client: &mut UsersClient) -> anyhow::Result<()> {
let user_ids = self.non_cached_messages.keys().collect::<Vec<_>>();
let users = users_client.get_users(&user_ids).await?;
let non_cached_messages = std::mem::take(&mut self.non_cached_messages);
for (user_id, messages) in non_cached_messages {
match users.get(&user_id) {
Some(user_login) => {
for message in messages {
self.write_message(&message, user_login, &user_id).await?;
// write_message(message, user, &self.channel_user, &mut self.inserter)
// .await?;
}
}
None => {
self.invalid_user_ids.insert(user_id);
}
}
}
Ok(())
}
async fn write_message(
&mut self,
supibot_message: &SupibotMessage,
user_login: &str,
user_id: &str,
) -> anyhow::Result<()> {
let text = &supibot_message.text;
let datetime = NaiveDateTime::parse_from_str(&supibot_message.posted, DATE_FMT).unwrap();
if self.existing_dates.contains(&datetime.date()) {
self.skipped_count += 1;
return Ok(());
}
let timestamp = datetime.timestamp_millis() as u64;
let raw = format!(
"@id=;returning-chatter=0;turbo=0;mod=0;room-id={channel_id};subscriber=;tmi-sent-ts={timestamp};badge-info=;user-id={user_id};badges=;user-type=;display-name={display_name};flags=;emotes=;first-msg=0;color={color} :{login}!{login}@{login}.tmi.twitch.tv PRIVMSG #{channel_login} :{text}",
channel_id = self.channel_id,
channel_login = self.channel_login,
display_name = user_login,
user_id = user_id,
login = user_login,
color = "",
);
let message = Message {
channel_id: Cow::Owned(self.channel_id.to_owned()),
user_id: Cow::Owned(user_id.to_owned()),
timestamp,
raw: Cow::Owned(raw),
};
self.inserter.write(&message).await?;
self.imported_count += 1;
Ok(())
}
}
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct SupibotMessage {
#[serde(rename = "ID")]
pub _id: u64,
#[serde(rename = "Platform_ID")]
pub platform_id: String,
pub historic: u8,
pub text: String,
pub posted: String,
}
#[cfg(test)]
mod tests {
use super::DATE_FMT;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
#[test]
fn parse_date() {
let date_str = "2023-06-23 14:46:26.588";
let datetime = NaiveDateTime::parse_from_str(date_str, DATE_FMT).unwrap();
assert_eq!(
datetime,
NaiveDateTime::new(
NaiveDate::from_ymd_opt(2023, 6, 23).unwrap(),
NaiveTime::from_hms_milli_opt(14, 46, 26, 588).unwrap()
)
);
}
}

View File

@ -0,0 +1,150 @@
use anyhow::{anyhow, Context};
use serde::Deserialize;
use std::{collections::HashMap, fs::File, io::BufReader, path::Path};
use tracing::{debug, info, warn};
#[derive(Default)]
pub struct UsersClient {
client: reqwest::Client,
users: HashMap<String, String>,
// Names mapped to ids
names: HashMap<String, Option<String>>,
}
#[derive(Deserialize)]
struct FileUser {
#[serde(rename = "Name")]
name: String,
#[serde(rename = "ID")]
id: String,
}
impl UsersClient {
pub fn add_from_file(&mut self, file_path: &Path) -> anyhow::Result<()> {
info!("Loading users from {file_path:?}");
let file = File::open(file_path)?;
let reader = BufReader::new(file);
let rdr = csv::Reader::from_reader(reader);
for user in rdr.into_deserialize::<FileUser>() {
let user = user?;
self.users.insert(user.id.clone(), user.name.clone());
self.names.insert(user.name, Some(user.id));
}
info!("{} users loaded", self.users.len());
Ok(())
}
pub async fn get_users(
&mut self,
ids: &[impl AsRef<str>],
) -> anyhow::Result<HashMap<String, String>> {
let mut ids_to_request = Vec::with_capacity(ids.len());
let mut response_users = HashMap::with_capacity(ids.len());
for id in ids {
match self.users.get(id.as_ref()) {
Some(name) => {
response_users.insert(id.as_ref().to_owned(), name.clone());
}
None => {
ids_to_request.push(id.as_ref());
}
}
}
let request_futures = ids_to_request.chunks(50).map(|chunk| {
debug!("Requesting a chunk of {} users", chunk.len());
async {
let response = self
.client
.get("https://api.ivr.fi/v2/twitch/user")
.query(&[("id", chunk.join(","))])
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow!(
"Got an error from IVR API: {} {}",
response.status(),
response.text().await?
));
}
Ok(response.json::<Vec<IvrUser>>().await?)
}
});
// let results = join_all(request_futures).await;
let mut results = Vec::new();
for future in request_futures {
results.push(future.await);
}
for result in results {
let api_response = result?;
for user in api_response {
self.users.insert(user.id.clone(), user.login.clone());
response_users.insert(user.id.clone(), user.login);
}
}
Ok(response_users)
}
pub async fn get_user_id_by_name(&mut self, name: &str) -> anyhow::Result<Option<String>> {
match self.names.get(name) {
Some(id) => Ok(id.clone()),
None => {
debug!("Fetching info for name {name}");
let response = self
.client
.get("https://api.ivr.fi/v2/twitch/user")
.query(&[("login", name)])
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow!(
"Got an error from IVR API: {} {}",
response.status(),
response.text().await?
));
}
let users: Vec<IvrUser> = response
.json()
.await
.context("Could not deserialize IVR response")?;
match users.into_iter().next() {
Some(user) => {
self.names.insert(user.login.clone(), Some(user.id.clone()));
self.users.insert(user.id.clone(), user.login.clone());
Ok(Some(user.id))
}
None => {
warn!("User {name} cannot be retrieved");
self.names.insert(name.to_owned(), None);
Ok(None)
}
}
}
}
}
pub fn get_cached_user_login(&self, id: &str) -> Option<&str> {
self.users.get(id).map(|s| s.as_str())
}
}
#[derive(Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IvrUser {
pub id: String,
pub display_name: String,
pub login: String,
pub chat_color: Option<String>,
}