parallel migration

This commit is contained in:
boring_nick 2023-06-05 22:50:08 +03:00
parent df4a94e006
commit 9586ddb207
3 changed files with 62 additions and 32 deletions

View File

@ -17,5 +17,8 @@ pub enum Command {
/// List of channel ids to migrate (None specified = migrate all)
#[clap(short, long, value_parser)]
channel_id: Vec<String>,
/// Parallel migration jobs
#[clap(short, long, default_value_t = 1)]
jobs: usize,
},
}

View File

@ -73,7 +73,8 @@ async fn main() -> anyhow::Result<()> {
Some(Command::Migrate {
source_dir,
channel_id,
}) => migrate(db, source_dir, channel_id).await,
jobs,
}) => migrate(db, source_dir, channel_id, jobs).await,
}
}
@ -138,9 +139,10 @@ async fn migrate(
db: clickhouse::Client,
source_logs_path: String,
channel_ids: Vec<String>,
jobs: usize,
) -> anyhow::Result<()> {
let migrator = Migrator::new(db, &source_logs_path, channel_ids).await?;
migrator.run().await
let migrator = Migrator::new(db, source_logs_path, channel_ids).await?;
migrator.run(jobs).await
}
async fn generate_token(config: &Config) -> anyhow::Result<AppAccessToken> {

View File

@ -16,40 +16,45 @@ use std::{
fs::File,
io::{BufRead, BufReader},
path::{Path, PathBuf},
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::Semaphore;
use tracing::{debug, info, warn};
use twitch_irc::message::IRCMessage;
const LINES_BATCH_SIZE: usize = 32768;
#[derive(Clone)]
pub struct Migrator {
db: clickhouse::Client,
source_logs: LogsReader,
channel_ids: Vec<String>,
source_logs_path: String,
channel_ids: Arc<Vec<String>>,
}
impl Migrator {
pub async fn new(
db: clickhouse::Client,
source_logs_path: &str,
source_logs_path: String,
channel_ids: Vec<String>,
) -> anyhow::Result<Migrator> {
let source_logs = LogsReader::new(source_logs_path).await?;
Ok(Self {
db,
source_logs,
channel_ids,
source_logs_path,
channel_ids: Arc::new(channel_ids),
})
}
pub async fn run(self) -> anyhow::Result<()> {
pub async fn run(self, parallel_count: usize) -> anyhow::Result<()> {
let source_logs = LogsReader::new(&self.source_logs_path).await?;
info!("Migrating channels {:?}", self.channel_ids);
let started_at = Instant::now();
let channels = source_logs.get_stored_channels().await?;
let channels = self.source_logs.get_stored_channels().await?;
let semaphore = Arc::new(Semaphore::new(parallel_count));
let mut handles = Vec::with_capacity(parallel_count);
for channel_id in channels {
if !self.channel_ids.is_empty() && !self.channel_ids.contains(&channel_id) {
@ -57,36 +62,55 @@ impl Migrator {
continue;
}
let available_logs = self
.source_logs
let available_logs = source_logs
.get_available_channel_logs(&channel_id, true)
.await?;
for (year, months) in available_logs {
for (month, days) in months {
let mut inserter = self
.db
.inserter(MESSAGES_TABLE)?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_entries(750_000)
.with_period(Some(Duration::from_secs(15)));
let permit = semaphore.clone().acquire_owned().await.unwrap();
let migrator = self.clone();
let channel_id = channel_id.clone();
let root_path = source_logs.root_path.clone();
for day in days {
let date = Utc
.with_ymd_and_hms(year.try_into().unwrap(), month, day, 0, 0, 0)
.unwrap();
info!(
"Migrating channel {channel_id} date {date}",
date = date.format("%Y-%m-%d")
);
self.migrate_day(&channel_id, date, &mut inserter).await?;
}
let handle = tokio::spawn(async move {
let mut inserter = migrator
.db
.inserter(MESSAGES_TABLE)?
.with_timeouts(
Some(Duration::from_secs(5)),
Some(Duration::from_secs(20)),
)
.with_max_entries(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.end().await?;
for day in days {
let date = Utc
.with_ymd_and_hms(year.try_into().unwrap(), month, day, 0, 0, 0)
.unwrap();
info!(
"Migrating channel {channel_id} date {date}",
date = date.format("%Y-%m-%d")
);
migrator
.migrate_day(&root_path, &channel_id, date, &mut inserter)
.await?;
}
inserter.end().await?;
drop(permit);
Result::<_, anyhow::Error>::Ok(())
});
handles.push(handle);
}
}
}
for handle in handles {
handle.await.unwrap()?;
}
let elapsed = started_at.elapsed();
info!("Migration finished in {elapsed:?}");
@ -95,11 +119,12 @@ impl Migrator {
async fn migrate_day<'a>(
&self,
root_path: &Path,
channel_id: &'a str,
date: DateTime<Utc>,
inserter: &mut Inserter<Message<'a>>,
) -> anyhow::Result<()> {
let day_path = get_day_path(&self.source_logs.root_path, channel_id, date);
let day_path = get_day_path(root_path, channel_id, date);
let compressed_file_path = day_path.join(COMPRESSED_CHANNEL_FILE);
let uncompressed_file_path = day_path.join(UNCOMPRESSED_CHANNEL_FILE);