#[macro_use] extern crate error_chain; extern crate futures; extern crate rdkafka; use std::clone::Clone; use std::mem::size_of; use std::ptr::copy_nonoverlapping; use std::str::from_utf8; use std::sync::mpsc; use std::thread; use std::time::Duration; use futures::Stream; use rdkafka::config::ClientConfig; use rdkafka::consumer::EmptyConsumerContext; use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::message::BorrowedMessage; use rdkafka::message::Message; error_chain! { errors { } } /* #[derive(Clone)] struct Table { table: HashMap>, } impl Table { pub fn new() -> Table { Table { table: HashMap::new() } } pub fn insert(&mut self, row: R, column: C, value: V) -> Option { self.table.entry(row) .or_insert_with(|| HashMap::new()) .insert(column, value) } pub fn get(&mut self, row: R, column: C) -> Option<&V> { self.table.get(&row) .and_then(|h| h.get(&column)) } } impl Debug for Table where R: Eq + Hash + Debug, C: Eq + Hash + Debug, V: Debug { fn fmt(&self, f: &mut Formatter) -> Result { self.table.fmt(f) } } */ #[derive(Clone, Debug, Eq, Hash, PartialEq)] struct TopicPartition { pub topic: String, pub partition: i32, } impl TopicPartition { pub fn new(topic: String, partition: i32) -> TopicPartition { TopicPartition { topic: topic, partition: partition, } } } type ConsumerGroup = String; type Offset = i64; trait Watcher { fn start(&mut self); fn receiver(&self) -> &mpsc::Receiver; } struct KafkaWatcher { brokers: String, sender: mpsc::Sender, receiver: mpsc::Receiver, } impl KafkaWatcher { pub fn new(brokers: String) -> KafkaWatcher { let (sender, receiver) = mpsc::channel(); KafkaWatcher { brokers: brokers, sender: sender, receiver: receiver, } } } #[derive(Debug)] enum OffsetMessage { OffsetSet { group: ConsumerGroup, topic_partition: TopicPartition, offset: Offset, }, OffsetDelete { group: ConsumerGroup, topic_partition: TopicPartition, }, GroupMetadata, // We don't read these messages } macro_rules! bytes_into { ($src:expr, $type:ty, $offset:expr) => ({ let size = size_of::<$type>(); if ($src.len() - $offset) >= size { let mut value: $type = 0; unsafe { copy_nonoverlapping( $src.as_ptr().offset($offset as isize), &mut value as *mut $type as *mut u8, size, ); } Some((value, $offset + size)) } else { None } }); } fn bytes_into_str<'a>(src: &'a[u8], offset: usize) -> Option<(&'a str, usize)> { // strings are under the format [length|characters] // length is encoded on 2 bytes, then comes `length` characters bytes_into!(src, i16, offset) .and_then(|(length, new_offset)| { let end_offset = new_offset + length as usize; match from_utf8(&src[new_offset..end_offset]) { Ok(s) => Some((s, end_offset)), _ => None } }) } fn parse_offset_message(key: &[u8], payload: &[u8]) -> Option { // https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1076 let (group, offset) = bytes_into_str(key, 2).unwrap(); let (topic, offset) = bytes_into_str(key, offset).unwrap(); let partition = bytes_into!(key, i32, offset).unwrap().0; let group = group.to_owned(); let topic_partition = TopicPartition::new(topic.to_owned(), partition); if payload.len() == 0 { Some( OffsetMessage::OffsetDelete { group: group, topic_partition: topic_partition, } ) } else { // https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1105 // first 2 bytes are version, then comes the offset in 8 bytes bytes_into!(payload, i64, 2) .map(|(consumer_offset, _)| { OffsetMessage::OffsetSet { group: group, topic_partition: topic_partition, offset: consumer_offset as Offset, } }) } } fn parse_message(message: BorrowedMessage) -> Result { let key = message.key().ok_or("Message has no key")?; let payload = message.payload().ok_or("Message has no payload")?; let version = bytes_into!(key, i16, 0) .map(|(v, _)| v.to_be()) .ok_or("Unable to read offset message version")?; match version { 0 | 1 => parse_offset_message(key, payload).ok_or("Unable to read offset message".into()), 2 => Ok(OffsetMessage::GroupMetadata), _ => Err("Unknown Key version".into()), } } impl Watcher for KafkaWatcher { fn start(&mut self) { let consumer = ClientConfig::new() .set("bootstrap.servers", self.brokers.as_str()) .set("group.id", "offset-consumer") .set("enable.auto.commit", "false") .set("auto.offset.reset", "earliest") .create::>() .expect("Consumer creation failed"); let sender = self.sender.clone(); thread::spawn(move || { for message in consumer.start_with(Duration::from_millis(200), true).wait() { if let Ok(Ok(message)) = message { let sent = parse_message(message) .and_then(|offset_message| { sender.send(offset_message) .chain_err(|| "Unable to send") }); if let Err(e) = sent { println!("{:?}", e); } } else { println!("{:?}", message); } } }); } fn receiver(&self) -> &mpsc::Receiver { &self.receiver } } fn main() { let mut watcher = KafkaWatcher::new("localhost:9092".to_owned()); watcher.start(); let receiver = watcher.receiver(); println!("{:?}", receiver.recv().unwrap()); }