#[macro_use] extern crate error_chain;
extern crate futures;
extern crate rdkafka;

use std::clone::Clone;
use std::mem::size_of;
use std::ptr::copy_nonoverlapping;
use std::str::from_utf8;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

use futures::Stream;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::EmptyConsumerContext;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::message::BorrowedMessage;
use rdkafka::message::Message;

error_chain! {
    errors {

    }
}

/*
#[derive(Clone)]
struct Table<R, C, V> {
    table: HashMap<R, HashMap<C, V>>,
}

impl<R: Hash + Eq, C: Hash + Eq, V> Table<R, C, V> {
    pub fn new() -> Table<R, C, V> {
        Table { table: HashMap::new() }
    }

    pub fn insert(&mut self, row: R, column: C, value: V) -> Option<V> {
        self.table.entry(row)
            .or_insert_with(|| HashMap::new())
            .insert(column, value)
    }

    pub fn get(&mut self, row: R, column: C) -> Option<&V> {
        self.table.get(&row)
            .and_then(|h| h.get(&column))
    }
}

impl <R, C, V> Debug for Table<R, C, V>
    where R: Eq + Hash + Debug,
          C: Eq + Hash + Debug,
          V: Debug
{
    fn fmt(&self, f: &mut Formatter) -> Result {
        self.table.fmt(f)
    }
}
*/

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct TopicPartition {
    pub topic: String,
    pub partition: i32,
}

impl TopicPartition {
    pub fn new(topic: String, partition: i32) -> TopicPartition {
        TopicPartition {
            topic: topic,
            partition: partition,
        }
    }
}

type ConsumerGroup = String;
type Offset = i64;

trait Watcher {
    fn start(&mut self);
    fn receiver(&self) -> &mpsc::Receiver<OffsetMessage>;
}

struct KafkaWatcher {
    brokers: String,
    sender: mpsc::Sender<OffsetMessage>,
    receiver: mpsc::Receiver<OffsetMessage>,
}

impl KafkaWatcher {
    pub fn new(brokers: String) -> KafkaWatcher {
        let (sender, receiver) = mpsc::channel();
        KafkaWatcher {
            brokers: brokers,
            sender: sender,
            receiver: receiver,
        }
    }
}

#[derive(Debug)]
enum OffsetMessage {
    OffsetSet {
        group: ConsumerGroup,
        topic_partition: TopicPartition,
        offset: Offset,
    },
    OffsetDelete {
        group: ConsumerGroup,
        topic_partition: TopicPartition,
    },
    GroupMetadata, // We don't read these messages
}

macro_rules! bytes_into {
    ($src:expr, $type:ty, $offset:expr) => ({
        let size = size_of::<$type>();
        if ($src.len() - $offset) >= size {
            let mut value: $type = 0;
            unsafe {
                copy_nonoverlapping(
                    $src.as_ptr().offset($offset as isize),
                    &mut value as *mut $type as *mut u8,
                    size,
                );
            }
            Some((value, $offset + size))
        } else {
            None
        }
    });
}

fn bytes_into_str<'a>(src: &'a[u8], offset: usize) -> Option<(&'a str, usize)> {
    // strings are under the format [length|characters]
    // length is encoded on 2 bytes, then comes `length` characters
    bytes_into!(src, i16, offset)
        .and_then(|(length, new_offset)| {
            let end_offset = new_offset + length as usize;
            match from_utf8(&src[new_offset..end_offset]) {
                Ok(s) => Some((s, end_offset)),
                _ => None
            }
        })
}

fn parse_offset_message(key: &[u8], payload: &[u8]) -> Option<OffsetMessage> {
    // https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1076
    let (group, offset) = bytes_into_str(key, 2).unwrap();
    let (topic, offset) = bytes_into_str(key, offset).unwrap();
    let partition = bytes_into!(key, i32, offset).unwrap().0;

    let group = group.to_owned();
    let topic_partition = TopicPartition::new(topic.to_owned(), partition);

    if payload.len() == 0 {
        Some(
            OffsetMessage::OffsetDelete {
                group: group,
                topic_partition: topic_partition,
            }
        )
    } else {
        // https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1105
        // first 2 bytes are version, then comes the offset in 8 bytes
        bytes_into!(payload, i64, 2)
            .map(|(consumer_offset, _)| {
                OffsetMessage::OffsetSet {
                    group: group,
                    topic_partition: topic_partition,
                    offset: consumer_offset as Offset,
                }
            })
    }
}

fn parse_message(message: BorrowedMessage) -> Result<OffsetMessage> {
    let key = message.key().ok_or("Message has no key")?;
    let payload = message.payload().ok_or("Message has no payload")?;

    let version = bytes_into!(key, i16, 0)
        .map(|(v, _)| v.to_be())
        .ok_or("Unable to read offset message version")?;

    match version {
        0 | 1 => parse_offset_message(key, payload).ok_or("Unable to read offset message".into()),
        2 => Ok(OffsetMessage::GroupMetadata),
        _ => Err("Unknown Key version".into()),
    }
}

impl Watcher for KafkaWatcher {
    fn start(&mut self) {
        let consumer = ClientConfig::new()
            .set("bootstrap.servers", self.brokers.as_str())
            .set("group.id", "offset-consumer")
            .set("enable.auto.commit", "false")
            .set("auto.offset.reset", "earliest")
            .create::<StreamConsumer<EmptyConsumerContext>>()
            .expect("Consumer creation failed");

        let sender = self.sender.clone();
        thread::spawn(move || {
            for message in consumer.start_with(Duration::from_millis(200), true).wait() {
                if let Ok(Ok(message)) = message {
                    let sent = parse_message(message)
                        .and_then(|offset_message| {
                            sender.send(offset_message)
                                .chain_err(|| "Unable to send")
                        });

                    if let Err(e) = sent {
                        println!("{:?}", e);
                    }
                } else {
                    println!("{:?}", message);
                }
            }
        });
    }

    fn receiver(&self) -> &mpsc::Receiver<OffsetMessage> {
        &self.receiver
    }
}

fn main() {
    let mut watcher = KafkaWatcher::new("localhost:9092".to_owned());
    watcher.start();
    let receiver = watcher.receiver();
    println!("{:?}", receiver.recv().unwrap());
}