-
Notifications
You must be signed in to change notification settings - Fork 7
/
end_to_end.R
48 lines (30 loc) · 1.19 KB
/
end_to_end.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
library(fRanz)
# system call to start a local kafka instance
system("docker-compose up -d --build")
BROKER_HOST <- 'localhost'
BROKER_PORT <- 9092
TOPIC_NAME <- 'myTestTopic'
# KafkaBroker
broker <- KafkaBroker$new(host=BROKER_HOST, port=BROKER_PORT)
# KafkaProducer
producer <- KafkaProducer$new(brokers = list(broker))
producer$produce(topic = TOPIC_NAME,
key = "myKey",
value = "My First Message")
#KafkaConsumer
consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))
consumer$subscribe(topics = c(TOPIC_NAME))
result <- consumer$consume(topic=TOPIC_NAME)
result
#### Multiple messages
producer <- KafkaProducer$new(brokers = list(broker))
producer$produce(topic = TOPIC_NAME,
key = "mySecondKey",
value = "My Second Message")
producer$produce(topic = TOPIC_NAME,
key = "myThirdKey",
value = "My Third Message")
consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))
consumer$subscribe(topics = c(TOPIC_NAME))
result <- consumer$consume(topic=TOPIC_NAME)
result