init
This commit is contained in:
commit
8394ebc0de
26
.drone.yml
Normal file
26
.drone.yml
Normal file
@ -0,0 +1,26 @@
|
||||
kind: pipeline
|
||||
type: kubernetes
|
||||
name: default
|
||||
metadata:
|
||||
namespace: git
|
||||
concurrency:
|
||||
limit: 1
|
||||
trigger:
|
||||
branch:
|
||||
- main
|
||||
event:
|
||||
- push
|
||||
steps:
|
||||
- name: build
|
||||
image: docker
|
||||
privileged: true
|
||||
environment:
|
||||
TOKEN:
|
||||
from_secret: gitea
|
||||
TOKEN_DOCKER:
|
||||
from_secret: docker_hub
|
||||
commands:
|
||||
- dockerd &
|
||||
- docker login -u kieran -p $TOKEN git.v0l.io
|
||||
- docker buildx build --push -t git.v0l.io/kieran/ingress2analytics:latest .
|
||||
- kill $(cat /var/run/docker.pid)
|
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
/target
|
||||
.idea/
|
2322
Cargo.lock
generated
Normal file
2322
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
17
Cargo.toml
Normal file
17
Cargo.toml
Normal file
@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "ingress2analytics"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
clap = { version = "4.5.27", features = ["derive"] }
|
||||
k8s-openapi = { version = "0.24.0", features = ["latest"] }
|
||||
kube = "0.98.0"
|
||||
log = "0.4.25"
|
||||
pretty_env_logger = "0.5.0"
|
||||
tokio = { version = "1.43.0", features = ["macros", "rt", "rt-multi-thread"] }
|
||||
futures-util = "0.3.31"
|
||||
nginx-log-parser = { git = "https://github.com/themasch/nginx-log-parser.git" }
|
||||
reqwest = "0.12.12"
|
||||
serde = "1.0.217"
|
||||
serde_json = "1.0.137"
|
17
Dockerfile
Normal file
17
Dockerfile
Normal file
@ -0,0 +1,17 @@
|
||||
ARG IMAGE=rust:bookworm
|
||||
ARG FEATURES
|
||||
|
||||
FROM $IMAGE AS build
|
||||
WORKDIR /app/src
|
||||
COPY src src
|
||||
COPY Cargo.lock Cargo.lock
|
||||
COPY Cargo.toml Cargo.toml
|
||||
RUN cargo install --path . --root /app/build --features "${FEATURES}"
|
||||
|
||||
FROM $IMAGE AS runner
|
||||
LABEL org.opencontainers.image.source="https://git.v0l.io/Kieran/ingress2analytics"
|
||||
LABEL org.opencontainers.image.licenses="MIT"
|
||||
LABEL org.opencontainers.image.authors="Kieran"
|
||||
WORKDIR /app
|
||||
COPY --from=build /app/build .
|
||||
ENTRYPOINT ["./bin/ingress2analytics"]
|
122
src/main.rs
Normal file
122
src/main.rs
Normal file
@ -0,0 +1,122 @@
|
||||
use crate::plausible::{Event, PlausibleAnalytics};
|
||||
use clap::Parser;
|
||||
use futures_util::io::AsyncBufReadExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use k8s_openapi::api::core::v1::Pod;
|
||||
use kube::api::{ListParams, LogParams};
|
||||
use kube::client::ClientBuilder;
|
||||
use kube::config::Kubeconfig;
|
||||
use kube::{Api, Client, Config};
|
||||
use log::{error, info};
|
||||
use nginx_log_parser::Format;
|
||||
use std::str::FromStr;
|
||||
|
||||
mod plausible;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(about)]
|
||||
struct App {
|
||||
#[arg(long)]
|
||||
config: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
namespace: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
plausible: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let app = App::parse();
|
||||
|
||||
let namespace = if let Some(n) = app.namespace {
|
||||
n
|
||||
} else {
|
||||
"ingress-nginx".to_string()
|
||||
};
|
||||
|
||||
let client = if let Some(c) = app.config {
|
||||
let cfg = Kubeconfig::read_from(c.as_str())?;
|
||||
let cfg = Config::from_custom_kubeconfig(cfg, &Default::default()).await?;
|
||||
ClientBuilder::try_from(cfg)?.build()
|
||||
} else {
|
||||
Client::try_default().await?
|
||||
};
|
||||
|
||||
let api: Api<Pod> = Api::namespaced(client, &namespace);
|
||||
let pods = api
|
||||
.list(&ListParams::default().labels("app.kubernetes.io/component=controller"))
|
||||
.await?;
|
||||
info!("Found {} pods!", pods.items.len());
|
||||
|
||||
let plausible = PlausibleAnalytics::new(&app.plausible);
|
||||
let mut tasks = Vec::new();
|
||||
for pod in pods.items {
|
||||
let api = api.clone();
|
||||
let name = if let Some(n) = pod.metadata.name {
|
||||
n.clone()
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
let sender = plausible.sender();
|
||||
let t = tokio::spawn(async move {
|
||||
if let Ok(log_stream) = api
|
||||
.log_stream(
|
||||
&name,
|
||||
&LogParams {
|
||||
follow: true,
|
||||
container: Some("controller".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
let mut stream = log_stream.lines();
|
||||
let format = r#"$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_length $request_time [$proxy_upstream_name] [$proxy_alternative_upstream_name] $upstream_addr $upstream_response_length $upstream_response_time $upstream_status $req_id"#;
|
||||
let format = Format::from_str(format).unwrap();
|
||||
while let Ok(Some(line)) = stream.try_next().await {
|
||||
if let Some(entry) = format.parse(&line) {
|
||||
let url = if let Some(r) = entry
|
||||
.get("request")
|
||||
.and_then(|r| r.split(" ").skip(1).next())
|
||||
{
|
||||
r
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// assume tracking call
|
||||
if url.ends_with("/api/event") {
|
||||
continue;
|
||||
}
|
||||
|
||||
sender
|
||||
.send(Event {
|
||||
name: "pageview".to_string(),
|
||||
domain: entry
|
||||
.get("proxy_upstream_name")
|
||||
.unwrap_or("unknown")
|
||||
.to_string(),
|
||||
url: url.to_string(),
|
||||
referrer: entry.get("http_referer").map(|r| r.to_string()),
|
||||
user_agent: entry.get("http_user_agent").map(|r| r.to_string()),
|
||||
xff: entry.get("remote_addr").map(|r| r.to_string()),
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("Failed to log pod {}", name);
|
||||
}
|
||||
});
|
||||
tasks.push(t);
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
56
src/plausible.rs
Normal file
56
src/plausible.rs
Normal file
@ -0,0 +1,56 @@
|
||||
use log::{info, warn};
|
||||
use reqwest::ClientBuilder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Event {
|
||||
pub name: String,
|
||||
pub domain: String,
|
||||
pub url: String,
|
||||
pub referrer: Option<String>,
|
||||
#[serde(skip_serializing)]
|
||||
pub user_agent: Option<String>,
|
||||
#[serde(skip_serializing)]
|
||||
pub xff: Option<String>,
|
||||
}
|
||||
|
||||
pub struct PlausibleAnalytics {
|
||||
tx: UnboundedSender<Event>,
|
||||
}
|
||||
|
||||
impl PlausibleAnalytics {
|
||||
pub fn new(url: &str) -> Self {
|
||||
let (tx, mut rx) = unbounded_channel::<Event>();
|
||||
|
||||
let c = ClientBuilder::new().build().unwrap();
|
||||
let url = url.to_owned();
|
||||
tokio::spawn(async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
let body = serde_json::to_string(&msg).unwrap();
|
||||
match c
|
||||
.post(format!("{}/api/event", &url))
|
||||
.header("User-Agent", msg.user_agent.unwrap_or("".to_string()))
|
||||
.header("X-Forwarded-For", msg.xff.unwrap_or("".to_string()))
|
||||
.header("content-type", "application/json")
|
||||
.body(body)
|
||||
.timeout(Duration::from_secs(30))
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(_v) => {
|
||||
info!("sent event: {} {}", &msg.domain, &msg.url);
|
||||
}
|
||||
Err(e) => warn!("Failed to track: {}", e),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self { tx }
|
||||
}
|
||||
|
||||
pub fn sender(&self) -> UnboundedSender<Event> {
|
||||
self.tx.clone()
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user