Compare commits
11 Commits
Author | SHA1 | Date |
---|---|---|
boring_nick | fc50c042e5 | |
boring_nick | 6e08dc2cf9 | |
boring_nick | eb2b5c876c | |
boring_nick | 9aef7121fc | |
boring_nick | 822f32ae5d | |
boring_nick | 25d290b640 | |
boring_nick | f41b4cd8cb | |
boring_nick | e888fb1933 | |
boring_nick | 0c2ca8ded4 | |
boring_nick | 1c5ff2f7a5 | |
boring_nick | 935842bafa |
|
@ -562,6 +562,27 @@ dependencies = [
|
|||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "csv"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
|
||||
dependencies = [
|
||||
"csv-core",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "csv-core"
|
||||
version = "0.1.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.5.3"
|
||||
|
@ -1027,6 +1048,15 @@ version = "2.8.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
|
||||
dependencies = [
|
||||
"either",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.9"
|
||||
|
@ -1699,10 +1729,12 @@ dependencies = [
|
|||
"chrono",
|
||||
"clap",
|
||||
"clickhouse",
|
||||
"csv",
|
||||
"dashmap",
|
||||
"flate2",
|
||||
"futures",
|
||||
"indexmap 1.9.3",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"metrics-prometheus",
|
||||
"mimalloc",
|
||||
|
|
|
@ -24,6 +24,7 @@ rand = "0.8.5"
|
|||
rayon = "1.7.0"
|
||||
reqwest = { version = "0.11.20", features = [
|
||||
"rustls-tls",
|
||||
"json",
|
||||
], default-features = false }
|
||||
rust-embed = { version = "8.0.0", features = ["interpolate-folder-path"] }
|
||||
schemars = "0.8.13"
|
||||
|
@ -54,6 +55,8 @@ twitch_api2 = { version = "0.6.1", features = [
|
|||
twitch = { git = "https://github.com/jprochazk/twitch-rs", features = ["simd"] }
|
||||
axum-prometheus = "0.4.0"
|
||||
metrics-prometheus = "0.4.1"
|
||||
csv = "1.2.2"
|
||||
itertools = "0.11.0"
|
||||
async-trait = "0.1.73"
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use clap::{Parser, Subcommand};
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
|
@ -21,4 +22,10 @@ pub enum Command {
|
|||
#[clap(short, long, default_value_t = 1)]
|
||||
jobs: usize,
|
||||
},
|
||||
MigrateSupibot {
|
||||
#[clap(short, long)]
|
||||
logs_dir: PathBuf,
|
||||
#[clap(short, long)]
|
||||
users_file: PathBuf,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -76,6 +76,10 @@ async fn main() -> anyhow::Result<()> {
|
|||
channel_id,
|
||||
jobs,
|
||||
}) => migrate(db, source_dir, channel_id, jobs).await,
|
||||
Some(Command::MigrateSupibot {
|
||||
logs_dir,
|
||||
users_file,
|
||||
}) => migrator::supibot::run(config, db, &logs_dir, &users_file).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
mod reader;
|
||||
pub mod supibot;
|
||||
|
||||
use self::reader::{LogsReader, COMPRESSED_CHANNEL_FILE, UNCOMPRESSED_CHANNEL_FILE};
|
||||
use crate::{
|
||||
|
|
|
@ -0,0 +1,315 @@
|
|||
mod users;
|
||||
|
||||
use super::INSERT_BATCH_SIZE;
|
||||
use crate::{
|
||||
config::Config,
|
||||
db::schema::{Message, MESSAGES_TABLE},
|
||||
error::Error,
|
||||
migrator::supibot::users::UsersClient,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDate, NaiveDateTime};
|
||||
use clickhouse::{inserter::Inserter, Row};
|
||||
use serde::Deserialize;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{HashMap, HashSet},
|
||||
fs::File,
|
||||
io::BufReader,
|
||||
path::Path,
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::{error, info};
|
||||
|
||||
// Exmaple: 2023-06-23 14:46:26.588
|
||||
const DATE_FMT: &str = "%F %X%.3f";
|
||||
const USERS_REQUEST_CHUNK_SIZE: usize = 50;
|
||||
|
||||
pub async fn run(
|
||||
config: Config,
|
||||
db: clickhouse::Client,
|
||||
logs_path: &Path,
|
||||
users_file_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut users_client = UsersClient::default();
|
||||
users_client
|
||||
.add_from_file(users_file_path)
|
||||
.context("Could not read the users file")?;
|
||||
|
||||
let read_dir = std::fs::read_dir(logs_path)?;
|
||||
|
||||
for entry in read_dir {
|
||||
let entry = entry?;
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_str().expect("File name is invalid UTF-8");
|
||||
|
||||
if let Some(channel_name) = file_name.strip_suffix(".csv") {
|
||||
let channel_id = users_client
|
||||
.get_user_id_by_name(channel_name)
|
||||
.await?
|
||||
.context("Could not get channel id")?;
|
||||
info!("Migrating file {file_name}, channel id {channel_id}");
|
||||
|
||||
let inserter = db
|
||||
.inserter(MESSAGES_TABLE)?
|
||||
.with_timeouts(
|
||||
Some(Duration::from_secs(30)),
|
||||
Some(Duration::from_secs(180)),
|
||||
)
|
||||
.with_max_entries(INSERT_BATCH_SIZE)
|
||||
.with_period(Some(Duration::from_secs(15)));
|
||||
|
||||
let existing_dates = get_existing_dates(&db, &channel_id).await?;
|
||||
|
||||
let migrator = SupibotMigrator {
|
||||
non_cached_messages: HashMap::with_capacity(USERS_REQUEST_CHUNK_SIZE),
|
||||
inserter,
|
||||
channel_id: channel_id.to_owned(),
|
||||
channel_login: channel_name.to_owned(),
|
||||
invalid_user_ids: HashSet::new(),
|
||||
existing_dates,
|
||||
imported_count: 0,
|
||||
skipped_count: 0,
|
||||
};
|
||||
|
||||
info!("Adding channel {channel_id} to config");
|
||||
config
|
||||
.channels
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(channel_id.to_owned());
|
||||
config.save()?;
|
||||
|
||||
match migrator
|
||||
.migrate_channel(&entry.path(), &mut users_client)
|
||||
.await
|
||||
{
|
||||
Ok((imported_count, skipped_count)) => {
|
||||
info!("Channel {channel_name} successfully migrated:");
|
||||
info!("Imported {imported_count} messages");
|
||||
info!("{skipped_count} messages were skipped due to duplicate dates",);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Could not migrate channel {channel_name}: {err:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_existing_dates(
|
||||
db: &clickhouse::Client,
|
||||
channel_id: &str,
|
||||
) -> Result<HashSet<NaiveDate>, Error> {
|
||||
info!("Getting existing log dates");
|
||||
|
||||
#[derive(Row, Deserialize)]
|
||||
struct DateRow {
|
||||
datetime: u32,
|
||||
}
|
||||
|
||||
let raw_dates = db
|
||||
.query(
|
||||
"SELECT DISTINCT toStartOfDay(timestamp) as datetime FROM message WHERE channel_id = ?",
|
||||
)
|
||||
.bind(channel_id)
|
||||
.fetch_all::<DateRow>()
|
||||
.await?;
|
||||
let dates: HashSet<NaiveDate> = raw_dates
|
||||
.into_iter()
|
||||
.map(|row| {
|
||||
let datetime = NaiveDateTime::from_timestamp_opt(row.datetime as i64, 0)
|
||||
.expect("Invalid timestamp returned by the db");
|
||||
datetime.date()
|
||||
})
|
||||
.collect();
|
||||
info!(
|
||||
"Found {} dates where the channel already has logs",
|
||||
dates.len()
|
||||
);
|
||||
|
||||
Ok(dates)
|
||||
}
|
||||
|
||||
struct SupibotMigrator {
|
||||
existing_dates: HashSet<NaiveDate>,
|
||||
/// Messages whose users are not cached
|
||||
/// Indexed by user id
|
||||
non_cached_messages: HashMap<String, Vec<SupibotMessage>>,
|
||||
inserter: Inserter<Message<'static>>,
|
||||
channel_id: String,
|
||||
channel_login: String,
|
||||
invalid_user_ids: HashSet<String>,
|
||||
imported_count: u64,
|
||||
skipped_count: u64,
|
||||
}
|
||||
|
||||
impl SupibotMigrator {
|
||||
async fn migrate_channel(
|
||||
mut self,
|
||||
file_path: &Path,
|
||||
users_client: &mut UsersClient,
|
||||
) -> anyhow::Result<(u64, u64)> {
|
||||
let file = File::open(file_path)?;
|
||||
let reader = BufReader::new(file);
|
||||
let rdr = csv::Reader::from_reader(reader);
|
||||
|
||||
for (i, result) in rdr.into_deserialize::<SupibotMessage>().enumerate() {
|
||||
if i % 100_000 == 0 {
|
||||
info!("Processing message {}", i + 1);
|
||||
}
|
||||
let supibot_message = result?;
|
||||
|
||||
if supibot_message.historic == 0 {
|
||||
if let Some(user_login) =
|
||||
users_client.get_cached_user_login(&supibot_message.platform_id)
|
||||
{
|
||||
self.write_message(&supibot_message, user_login, &supibot_message.platform_id)
|
||||
.await?;
|
||||
} else {
|
||||
self.non_cached_messages
|
||||
.entry(supibot_message.platform_id.clone())
|
||||
.or_default()
|
||||
.push(supibot_message);
|
||||
|
||||
if self.non_cached_messages.len() >= USERS_REQUEST_CHUNK_SIZE {
|
||||
self.flush_non_cached(users_client).await?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let user_id = users_client
|
||||
.get_user_id_by_name(&supibot_message.platform_id)
|
||||
.await?
|
||||
// Used when the user id cannot be retrieved
|
||||
.unwrap_or_default();
|
||||
|
||||
self.write_message(&supibot_message, &supibot_message.platform_id, &user_id)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let stats = self
|
||||
.inserter
|
||||
.commit()
|
||||
.await
|
||||
.context("Could not flush messages")?;
|
||||
if stats.entries > 0 {
|
||||
info!(
|
||||
"DB: {} entries ({} transactions) have been inserted",
|
||||
stats.entries, stats.transactions,
|
||||
);
|
||||
}
|
||||
}
|
||||
self.flush_non_cached(users_client).await?;
|
||||
|
||||
let stats = self
|
||||
.inserter
|
||||
.end()
|
||||
.await
|
||||
.context("Could not flush messages")?;
|
||||
if stats.entries > 0 {
|
||||
info!(
|
||||
"DB: {} entries ({} transactions) have been inserted",
|
||||
stats.entries, stats.transactions,
|
||||
);
|
||||
}
|
||||
|
||||
if !self.invalid_user_ids.is_empty() {
|
||||
error!("Invalid user ids: {:?}", self.invalid_user_ids);
|
||||
}
|
||||
|
||||
Ok((self.imported_count, self.skipped_count))
|
||||
}
|
||||
|
||||
async fn flush_non_cached(&mut self, users_client: &mut UsersClient) -> anyhow::Result<()> {
|
||||
let user_ids = self.non_cached_messages.keys().collect::<Vec<_>>();
|
||||
let users = users_client.get_users(&user_ids).await?;
|
||||
|
||||
let non_cached_messages = std::mem::take(&mut self.non_cached_messages);
|
||||
for (user_id, messages) in non_cached_messages {
|
||||
match users.get(&user_id) {
|
||||
Some(user_login) => {
|
||||
for message in messages {
|
||||
self.write_message(&message, user_login, &user_id).await?;
|
||||
// write_message(message, user, &self.channel_user, &mut self.inserter)
|
||||
// .await?;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
self.invalid_user_ids.insert(user_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_message(
|
||||
&mut self,
|
||||
supibot_message: &SupibotMessage,
|
||||
user_login: &str,
|
||||
user_id: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let text = &supibot_message.text;
|
||||
let datetime = NaiveDateTime::parse_from_str(&supibot_message.posted, DATE_FMT).unwrap();
|
||||
|
||||
if self.existing_dates.contains(&datetime.date()) {
|
||||
self.skipped_count += 1;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timestamp = datetime.timestamp_millis() as u64;
|
||||
let raw = format!(
|
||||
"@id=;returning-chatter=0;turbo=0;mod=0;room-id={channel_id};subscriber=;tmi-sent-ts={timestamp};badge-info=;user-id={user_id};badges=;user-type=;display-name={display_name};flags=;emotes=;first-msg=0;color={color} :{login}!{login}@{login}.tmi.twitch.tv PRIVMSG #{channel_login} :{text}",
|
||||
channel_id = self.channel_id,
|
||||
channel_login = self.channel_login,
|
||||
display_name = user_login,
|
||||
user_id = user_id,
|
||||
login = user_login,
|
||||
color = "",
|
||||
);
|
||||
|
||||
let message = Message {
|
||||
channel_id: Cow::Owned(self.channel_id.to_owned()),
|
||||
user_id: Cow::Owned(user_id.to_owned()),
|
||||
timestamp,
|
||||
raw: Cow::Owned(raw),
|
||||
};
|
||||
|
||||
self.inserter.write(&message).await?;
|
||||
self.imported_count += 1;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "PascalCase")]
|
||||
struct SupibotMessage {
|
||||
#[serde(rename = "ID")]
|
||||
pub _id: u64,
|
||||
#[serde(rename = "Platform_ID")]
|
||||
pub platform_id: String,
|
||||
pub historic: u8,
|
||||
pub text: String,
|
||||
pub posted: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::DATE_FMT;
|
||||
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
|
||||
|
||||
#[test]
|
||||
fn parse_date() {
|
||||
let date_str = "2023-06-23 14:46:26.588";
|
||||
let datetime = NaiveDateTime::parse_from_str(date_str, DATE_FMT).unwrap();
|
||||
assert_eq!(
|
||||
datetime,
|
||||
NaiveDateTime::new(
|
||||
NaiveDate::from_ymd_opt(2023, 6, 23).unwrap(),
|
||||
NaiveTime::from_hms_milli_opt(14, 46, 26, 588).unwrap()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,150 @@
|
|||
use anyhow::{anyhow, Context};
|
||||
use serde::Deserialize;
|
||||
use std::{collections::HashMap, fs::File, io::BufReader, path::Path};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct UsersClient {
|
||||
client: reqwest::Client,
|
||||
users: HashMap<String, String>,
|
||||
// Names mapped to ids
|
||||
names: HashMap<String, Option<String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct FileUser {
|
||||
#[serde(rename = "Name")]
|
||||
name: String,
|
||||
#[serde(rename = "ID")]
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl UsersClient {
|
||||
pub fn add_from_file(&mut self, file_path: &Path) -> anyhow::Result<()> {
|
||||
info!("Loading users from {file_path:?}");
|
||||
|
||||
let file = File::open(file_path)?;
|
||||
let reader = BufReader::new(file);
|
||||
let rdr = csv::Reader::from_reader(reader);
|
||||
|
||||
for user in rdr.into_deserialize::<FileUser>() {
|
||||
let user = user?;
|
||||
self.users.insert(user.id.clone(), user.name.clone());
|
||||
self.names.insert(user.name, Some(user.id));
|
||||
}
|
||||
|
||||
info!("{} users loaded", self.users.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_users(
|
||||
&mut self,
|
||||
ids: &[impl AsRef<str>],
|
||||
) -> anyhow::Result<HashMap<String, String>> {
|
||||
let mut ids_to_request = Vec::with_capacity(ids.len());
|
||||
let mut response_users = HashMap::with_capacity(ids.len());
|
||||
|
||||
for id in ids {
|
||||
match self.users.get(id.as_ref()) {
|
||||
Some(name) => {
|
||||
response_users.insert(id.as_ref().to_owned(), name.clone());
|
||||
}
|
||||
None => {
|
||||
ids_to_request.push(id.as_ref());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let request_futures = ids_to_request.chunks(50).map(|chunk| {
|
||||
debug!("Requesting a chunk of {} users", chunk.len());
|
||||
|
||||
async {
|
||||
let response = self
|
||||
.client
|
||||
.get("https://api.ivr.fi/v2/twitch/user")
|
||||
.query(&[("id", chunk.join(","))])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(anyhow!(
|
||||
"Got an error from IVR API: {} {}",
|
||||
response.status(),
|
||||
response.text().await?
|
||||
));
|
||||
}
|
||||
Ok(response.json::<Vec<IvrUser>>().await?)
|
||||
}
|
||||
});
|
||||
// let results = join_all(request_futures).await;
|
||||
let mut results = Vec::new();
|
||||
for future in request_futures {
|
||||
results.push(future.await);
|
||||
}
|
||||
|
||||
for result in results {
|
||||
let api_response = result?;
|
||||
for user in api_response {
|
||||
self.users.insert(user.id.clone(), user.login.clone());
|
||||
response_users.insert(user.id.clone(), user.login);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(response_users)
|
||||
}
|
||||
|
||||
pub async fn get_user_id_by_name(&mut self, name: &str) -> anyhow::Result<Option<String>> {
|
||||
match self.names.get(name) {
|
||||
Some(id) => Ok(id.clone()),
|
||||
None => {
|
||||
debug!("Fetching info for name {name}");
|
||||
let response = self
|
||||
.client
|
||||
.get("https://api.ivr.fi/v2/twitch/user")
|
||||
.query(&[("login", name)])
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(anyhow!(
|
||||
"Got an error from IVR API: {} {}",
|
||||
response.status(),
|
||||
response.text().await?
|
||||
));
|
||||
}
|
||||
|
||||
let users: Vec<IvrUser> = response
|
||||
.json()
|
||||
.await
|
||||
.context("Could not deserialize IVR response")?;
|
||||
|
||||
match users.into_iter().next() {
|
||||
Some(user) => {
|
||||
self.names.insert(user.login.clone(), Some(user.id.clone()));
|
||||
self.users.insert(user.id.clone(), user.login.clone());
|
||||
Ok(Some(user.id))
|
||||
}
|
||||
None => {
|
||||
warn!("User {name} cannot be retrieved");
|
||||
self.names.insert(name.to_owned(), None);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_cached_user_login(&self, id: &str) -> Option<&str> {
|
||||
self.users.get(id).map(|s| s.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IvrUser {
|
||||
pub id: String,
|
||||
pub display_name: String,
|
||||
pub login: String,
|
||||
pub chat_color: Option<String>,
|
||||
}
|
Loading…
Reference in New Issue