avatar
Untitled

Guest 101 27th Dec, 2019

MARKUP 4.16 KB
                                           
                         const { Transform } = require('stream')
const { 
  KafkaClient, 
  Producer,
  Consumer
 } = require('kafka-node')

const TopicProducer = async ( kafka_options, topic  ) => {
  const client = new KafkaClient(  kafka_options )
  const producer = new Producer( client )
  
  const createTopicsAsync = () => {
    return new Promise( ( resolve, reject ) => {
      console.log( `Creating Topic: ${topic}` )
      client.createTopics([ { topic } ], function (err, data) {
        if( err ) {
          reject( err ) 
        } else {
          console.log( `Created Topic: ${topic}` )
          resolve( data ) 
        }
        
      })
    })
  }

  await createTopicsAsync( [ { topic } ] ) 
  producer.on('ready', async () => {
    
  })

  producer.sendToTopic = ( key, message, options ) => {
    const messages = JSON.stringify(message)
    producer.send([{
      ...options,
      topic,
      key: JSON.stringify( key ),
      messages
    }], ( error ) => {
      if( error ) {
        console.error( e.message )
      } else {
        console.log(`[${topic}] Sent ${messages.length} bytes `)
      }
    })
  }

  return producer
}

const TopicConsumer =  ( kafka_options, topic, config  ) => {
  const client = new KafkaClient(  kafka_options )
  const consumer = new Consumer( client, [ { topic } ] , {  keyEncoding:'utf-8',encoding: 'utf-8',...config  }  )
  // consumer.on('message', message => {
  //   console.log( message )
  // })

  return consumer
}

const ProducerStream = async ( kafka_options, topic, key  ) =>  {

  const producer = await TopicProducer( kafka_options, topic )

  let data_streamed = 0 
  let order = 0
  
  const streamToTopic = new Transform({
    objectMode: true,
    decodeStrings: false,
    transform (buff, encoding, callback ) {
      order = order +1 

      const json_key = {
        order,
        topic,
        ...key
      }
      
      callback( null, {
        topic: topic,
        key: JSON.stringify( json_key ),
        messages: Buffer.from( buff )
      })
    }
  })

  streamToTopic.on( 'data', payload => {
    const { topic, messages } = payload
    data_streamed = data_streamed + messages.length
    
    producer.send( [ payload ], () => console.log( `Streamed ${data_streamed} to ${topic}` ) )
  })

  return streamToTopic
}

const ConsumerStream = ( kafka_options, topic, size, config ) => {
  const client = new KafkaClient(  kafka_options )
  const consumer = new Consumer( client, [ { topic } ] , { ...config, keyEncoding:'utf-8',encoding: 'buffer' }  )

  const buffer_size = config || 1024
  let data_streamed = 0
  let current = 0 
  let messages = []

  const topicToStream = new Transform({
    objectMode: true,
    decodeStrings: false,
    transform (value, encoding, callback) {
      callback(null, Buffer.from(value) );
    }
  }) 

  const processStream = ( messages ) => {
    console.log(`Processing ${messages.length}`)
    messages
    .sort( ( a, b ) => { 
      const aKey= JSON.parse(a.key)
      const bKey= JSON.parse(b.key)
      
      return aKey.order - bKey.order
    } )
    .map( msg => {
      // console.log( `Pushing ${msg.value.length} to Stream`)
      topicToStream.push( Buffer.from( msg.value )  )
    } ) 
  }

  consumer.on( 'message', ( message ) => {
    messages.push( message )
    data_streamed += message.value.length 
    
    const key = JSON.parse( message.key )
    console.log( key )

    // Bufferred Periodic
    if( messages.length % buffer_size == 0 || size == data_streamed ) {
      let sliceOfMessage = messages.slice( current, messages.length )
      processStream( sliceOfMessage )
      current +=  messages.length
    } 
  

    console.log(`Stream Progress - ( ${ Math.floor( ( data_streamed / size ) * 100 )} % ) [ ${data_streamed} / ${size} ]`, `Buffered [${current}/${messages.length }]` )
  }) 

  return topicToStream
}



module.exports = {
  ProducerStream,
  ConsumerStream,
  TopicProducer,
  TopicConsumer
}
                      
                                       
To share this paste please copy this url and send to your friends
RAW Paste Data

Comments

Authentication required

You must log in to post a comment.

Log in
    There are no comments yet.