Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send notifications using Rabbitmq instead of Cassandra #4138

Draft
wants to merge 46 commits into
base: hackathon-2024-deployment
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
dfc06c1
gundeck: Add Rabbitmq config
akshaymankar Jul 10, 2024
ad7eac4
cannon: Add rabbitmq config
akshaymankar Jul 10, 2024
4f61fdc
docker: Enable streams in rabbitmq
akshaymankar Jul 10, 2024
afed7eb
Make NotificationId a Text
akshaymankar Jul 10, 2024
9342456
Get gundeck and cannon to compile
akshaymankar Jul 10, 2024
9f49cc7
Delete cannon internal api and Gundeck.Push.Websocket module
akshaymankar Jul 10, 2024
7160971
Gundeck: Push notifs in rabbit, delete a bunch code (probs some of it…
akshaymankar Jul 10, 2024
80f18f0
galley: Seperate TeamNotifications types (hack some parsing things tho)
akshaymankar Jul 10, 2024
b1a8537
WIP
pcapriotti Jul 10, 2024
628f3db
WIP
pcapriotti Jul 10, 2024
33976fb
gundeck: Fetch notifs from rabbitmq
akshaymankar Jul 10, 2024
6460420
Initial implementation of rabbit -> ws push
pcapriotti Jul 10, 2024
b962865
gundeck: Use only rabbitmq to persist and retrive notifs
akshaymankar Jul 10, 2024
d76faae
gundeck: Deal with rabbit errors
akshaymankar Jul 10, 2024
40a48f1
integration: remove redundant constraints
akshaymankar Jul 10, 2024
7b19205
Cancel consumer when terminating ws connection
pcapriotti Jul 10, 2024
162e63d
gundeck: Delete mpaBulkPush
akshaymankar Jul 10, 2024
19631f4
gundeck: Delete mpaMkNotifcationId
akshaymankar Jul 10, 2024
c451fd4
gundeck: Delete mpaNotificationTTL
akshaymankar Jul 10, 2024
0ac8ebd
gundeck: Delete mpaListAllPresences
akshaymankar Jul 10, 2024
7bc9c48
gundeck: Delete presences
akshaymankar Jul 10, 2024
04cf5be
gundeck: Remove redis connection code TODO: Delete redis from opts
akshaymankar Jul 10, 2024
ddd61c5
gundeck: change type NotificationId from Text to Int64.
fisx Jul 10, 2024
49523ab
gundeck: provide totally awesome conversion functions from Int64 to U…
fisx Jul 10, 2024
fc7105e
Revert "gundeck: change type NotificationId from Text to Int64."
akshaymankar Jul 11, 2024
a18763e
charts/gundeck: Replace redis config with rabbitmq
akshaymankar Jul 11, 2024
7f7857b
charts/cannon: Add rabbitmq config
akshaymankar Jul 11, 2024
422858e
small cannon fixes
pcapriotti Jul 11, 2024
0a0355d
gundeck.integration.yaml: delete redis
akshaymankar Jul 11, 2024
bd24885
charts/rabbitmq: enable stream plugin
akshaymankar Jul 11, 2024
171c13c
Elaborate on NotificationId type TODO.
fisx Jul 11, 2024
42f5b11
cannon: call ensureNotificationStream
pcapriotti Jul 11, 2024
bc828cf
cannon: create channel for every ws
pcapriotti Jul 11, 2024
0ebb86c
Delete more redis stuff
akshaymankar Jul 11, 2024
0545ea6
cannon: Fetch queue with admin API before using it, somehow fixes thi…
akshaymankar Jul 11, 2024
70010d6
charts/cannon: Configure admin port
akshaymankar Jul 11, 2024
f1231f0
Hi CI
akshaymankar Jul 11, 2024
78d7cde
Delete tests to fix them
akshaymankar Jul 11, 2024
fb98954
Fix order of gundeck routes
pcapriotti Jul 11, 2024
503bf58
Fixup
fisx Jul 11, 2024
ebf0675
Traverse notification backwards
pcapriotti Jul 11, 2024
9d0724a
Merge remote-tracking branch 'refs/remotes/origin/notifs-without-cass…
fisx Jul 11, 2024
e2221ab
Fix compiler errors
fisx Jul 11, 2024
d45edb4
Fix compiler errors, again.
fisx Jul 11, 2024
55fb804
remove failing test.
fisx Jul 11, 2024
6b8d1d4
Acknowledge messages when fetching
pcapriotti Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions charts/cannon/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ data:
host: gundeck
port: 8080

{{- with .Values.config.rabbitmq }}
rabbitmq:
host: {{ .host }}
adminPort: {{ .adminPort }}
port: {{ .port }}
vHost: {{ .vHost }}
enableTls: {{ .enableTls }}
insecureSkipVerifyTls: {{ .insecureSkipVerifyTls }}
{{- if .tlsCaSecretRef }}
caCert: /etc/wire/cannon/rabbitmq-ca/{{ .tlsCaSecretRef.key }}
{{- end }}
{{- end }}

drainOpts:
gracePeriodSeconds: {{ .Values.config.drainOpts.gracePeriodSeconds }}
millisecondsBetweenBatches: {{ .Values.config.drainOpts.millisecondsBetweenBatches }}
Expand Down
9 changes: 9 additions & 0 deletions charts/cannon/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ spec:
mountPath: /etc/wire/cannon/externalHost
- name: cannon-config
mountPath: /etc/wire/cannon/conf
{{- if .Values.config.rabbitmq.tlsCaSecretRef }}
- name: "rabbitmq-ca"
mountPath: "/etc/wire/cannon/rabbitmq-ca/"
{{- end }}
ports:
- name: http
containerPort: {{ .Values.service.internalPort }}
Expand Down Expand Up @@ -148,3 +152,8 @@ spec:
secret:
secretName: {{ .Values.service.nginz.tls.secretName }}
{{- end }}
{{- if .Values.config.rabbitmq.tlsCaSecretRef }}
- name: "rabbitmq-ca"
secret:
secretName: {{ .Values.config.rabbitmq.tlsCaSecretRef.name }}
{{- end }}
10 changes: 10 additions & 0 deletions charts/cannon/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ config:
logLevel: Info
logFormat: StructuredJSON
logNetStrings: false
rabbitmq:
host: rabbitmq
port: 5672
adminPort: 15672
vHost: /
enableTls: false
insecureSkipVerifyTls: false
# tlsCaSecretRef:
# name: <secret-name>
# key: <ca-attribute>

# See also the section 'Controlling the speed of websocket draining during
# cannon pod replacement' in docs/how-to/install/configuration-options.rst
Expand Down
28 changes: 9 additions & 19 deletions charts/gundeck/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,15 @@ data:
tlsCa: /etc/wire/gundeck/cassandra/{{- (include "tlsSecretRef" . | fromYaml).key }}
{{- end }}

redis:
host: {{ .redis.host }}
port: {{ .redis.port }}
connectionMode: {{ .redis.connectionMode }}
enableTls: {{ .redis.enableTls }}
insecureSkipVerifyTls: {{ .redis.insecureSkipVerifyTls }}
{{- if eq (include "configureRedisCa" .) "true" }}
tlsCa: /etc/wire/gundeck/redis-ca/{{ include "redisTlsSecretKey" .}}
{{- end }}

{{- if .redisAdditionalWrite }}
redisAdditionalWrite:
host: {{ .redisAdditionalWrite.host }}
port: {{ .redisAdditionalWrite.port }}
connectionMode: {{ .redisAdditionalWrite.connectionMode }}
enableTls: {{ .redisAdditionalWrite.enableTls }}
insecureSkipVerifyTls: {{ .redisAdditionalWrite.insecureSkipVerifyTls }}
{{- if eq (include "configureAdditionalRedisCa" .) "true" }}
tlsCa: /etc/wire/gundeck/additional-redis-ca/{{ include "additionalRedisTlsSecretKey" .}}
{{- with .rabbitmq }}
rabbitmq:
host: {{ .host }}
port: {{ .port }}
vHost: {{ .vHost }}
enableTls: {{ .enableTls }}
insecureSkipVerifyTls: {{ .insecureSkipVerifyTls }}
{{- if .tlsCaSecretRef }}
caCert: /etc/wire/gundeck/rabbitmq-ca/{{ .tlsCaSecretRef.key }}
{{- end }}
{{- end }}

Expand Down
21 changes: 6 additions & 15 deletions charts/gundeck/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,10 @@ spec:
secret:
secretName: {{ (include "tlsSecretRef" .Values.config | fromYaml).name }}
{{- end}}
{{- if eq (include "configureRedisCa" .Values.config) "true" }}
- name: "redis-ca"
{{- if .Values.config.rabbitmq.tlsCaSecretRef }}
- name: "rabbitmq-ca"
secret:
secretName: {{ include "redisTlsSecretName" .Values.config }}
{{- end }}
{{- if eq (include "configureAdditionalRedisCa" .Values.config) "true" }}
- name: "additional-redis-ca"
secret:
secretName: {{ include "additionalRedisTlsSecretName" .Values.config }}
secretName: {{ .Values.config.rabbitmq.tlsCaSecretRef.name }}
{{- end }}
containers:
- name: gundeck
Expand All @@ -62,13 +57,9 @@ spec:
- name: "gundeck-cassandra"
mountPath: "/etc/wire/gundeck/cassandra"
{{- end }}
{{- if eq (include "configureRedisCa" .Values.config) "true" }}
- name: "redis-ca"
mountPath: "/etc/wire/gundeck/redis-ca/"
{{- end }}
{{- if eq (include "configureAdditionalRedisCa" .Values.config) "true" }}
- name: "additional-redis-ca"
mountPath: "/etc/wire/gundeck/additional-redis-ca/"
{{- if .Values.config.rabbitmq.tlsCaSecretRef }}
- name: "rabbitmq-ca"
mountPath: "/etc/wire/gundeck/rabbitmq-ca/"
{{- end }}
env:
{{- if hasKey .Values.secrets "awsKeyId" }}
Expand Down
28 changes: 4 additions & 24 deletions charts/gundeck/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,15 @@ config:
# tlsCaSecretRef:
# name: <secret-name>
# key: <ca-attribute>
redis:
host: redis-ephemeral-master
port: 6379
connectionMode: "master" # master | cluster
rabbitmq:
host: rabbitmq
port: 5672
vHost: /
enableTls: false
insecureSkipVerifyTls: false
# To configure custom TLS CA, please provide one of these:
# tlsCa: <CA in PEM format (can be self-signed)>
#
# Or refer to an existing secret (containing the CA):
# tlsCaSecretRef:
# name: <secret-name>
# key: <ca-attribute>

# To enable additional writes during a migration:
# redisAdditionalWrite:
# host: redis-two
# port: 6379
# connectionMode: master
# enableTls: false
# insecureSkipVerifyTls: false
#
# # To configure custom TLS CA, please provide one of these:
# # tlsCa: <CA in PEM format (can be self-signed)>
# #
# # Or refer to an existing secret (containing the CA):
# # tlsCaSecretRef:
# # name: <secret-name>
# # key: <ca-attribute>
bulkPush: true
aws:
region: "eu-west-1"
Expand Down
2 changes: 2 additions & 0 deletions charts/rabbitmq/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
rabbitmq:
extraPlugins: rabbitmq_stream
6 changes: 5 additions & 1 deletion deploy/dockerephemeral/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ services:

rabbitmq:
container_name: rabbitmq
image: rabbitmq:3.11-management-alpine
build:
context: .
dockerfile_inline: |
FROM rabbitmq:3.11-management-alpine
RUN rabbitmq-plugins enable rabbitmq_stream
environment:
- RABBITMQ_USERNAME
- RABBITMQ_PASSWORD
Expand Down
22 changes: 22 additions & 0 deletions integration/test/Test/MLS/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,33 @@ module Test.MLS.Message where

import API.Galley
import API.Gundeck
import qualified Data.Aeson as Aeson
import MLS.Util
import Notifications
import SetupHelpers
import Testlib.Prelude

-- import UnliftIO.Concurrent (threadDelay)

testFoo :: (HasCallStack) => App ()
testFoo = replicateM_ 10 $ do
alice <- randomUser OwnDomain def
printJSON alice
-- threadDelay 1000000
withWebSocket alice $ \ws -> do
-- liftIO $ threadDelay 1000000
void $ createMLSClient def alice
n <- awaitMatch isUserClientAddNotif ws
printJSON n

testBar :: (HasCallStack) => App ()
testBar = do
alice <- randomUser OwnDomain def
printJSON alice
getNotifications alice def `bindResponse` \resp -> do
resp.status `shouldMatchInt` 200
resp.json `shouldMatch` Aeson.Null

-- | Test happy case of federated MLS message sending in both directions.
testApplicationMessage :: (HasCallStack) => App ()
testApplicationMessage = do
Expand Down
29 changes: 10 additions & 19 deletions integration/test/Testlib/Cannon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import qualified Network.HTTP.Client as Http
import qualified Network.WebSockets as WS
import System.Random (randomIO)
import System.Timeout (timeout)
import Testlib.App
import Testlib.Assertions
import Testlib.Env
import Testlib.HTTP
Expand Down Expand Up @@ -148,9 +147,11 @@ clientApp wsChan latch conn = do
case decodeStrict' bs of
Just n -> atomically $ writeTChan wsChan n
Nothing -> putStrLn $ "Failed to decode notification: " ++ show bs
wsWrite = forever $ do
takeMVar latch
WS.sendClose conn ("close" :: ByteString)
wsWrite = do
WS.sendPing conn ("hello" :: ByteString)
forever $ do
takeMVar latch
WS.sendClose conn ("close" :: ByteString)

-- | Start a client thread in 'Async' that opens a web socket to a Cannon, wait
-- for the connection to register with Gundeck, and return the 'Async' thread.
Expand All @@ -164,7 +165,6 @@ run wsConnect app = do
serviceMap <- getServiceMap domain

let HostPort caHost caPort = serviceHostPort serviceMap Cannon
latch <- liftIO newEmptyMVar

connId <- case wsConnect.conn of
Just c -> pure c
Expand All @@ -184,6 +184,7 @@ run wsConnect app = do
r <- rawBaseRequest domain Cannon Versioned path
pure r {HTTP.requestHeaders = caHdrs}

waitForPong <- liftIO $ newEmptyMVar
wsapp <-
liftIO
$ async
Expand All @@ -192,21 +193,11 @@ run wsConnect app = do
caHost
(fromIntegral caPort)
path
WS.defaultConnectionOptions
(WS.defaultConnectionOptions {WS.connectionOnPong = void $ tryPutMVar waitForPong ()})
caHdrs
app
)
$ \(e :: SomeException) -> putMVar latch e

presenceRequest <-
baseRequest domain Cannon Unversioned $
"/i/presences/" <> wsConnect.user <> "/" <> connId

waitForPresence <- appToIO $ retryT $ do
response <- submit "HEAD" presenceRequest
status response `shouldMatchInt` 200
let waitForException = do
ex <- takeMVar latch
$ \(ex :: SomeException) -> do
-- Construct a "fake" response. We do not really have access to the
-- websocket connection requests and response, unfortunately, but it is
-- useful to display some information about the request in case an
Expand All @@ -220,8 +211,8 @@ run wsConnect app = do
request = request
}
throwIO (AssertionFailure callStack (Just r) (displayException ex))

liftIO $ race_ waitForPresence waitForException
-- TODO: add a race so we timeout
liftIO $ takeMVar waitForPong
pure wsapp

close :: (MonadIO m) => WebSocket -> m ()
Expand Down
12 changes: 9 additions & 3 deletions libs/extended/src/Network/AMQP/Extended.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ data RabbitMqTlsOpts = RabbitMqTlsOpts
{ caCert :: !(Maybe FilePath),
insecureSkipVerifyTls :: Bool
}
deriving (Show)
deriving (Show, Eq)

parseTlsJson :: Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson v = do
Expand All @@ -76,7 +76,7 @@ data RabbitMqAdminOpts = RabbitMqAdminOpts
tls :: Maybe RabbitMqTlsOpts,
adminPort :: !Int
}
deriving (Show)
deriving (Eq, Show)

instance FromJSON RabbitMqAdminOpts where
parseJSON = withObject "RabbitMqAdminOpts" $ \v ->
Expand All @@ -87,6 +87,9 @@ instance FromJSON RabbitMqAdminOpts where
<*> parseTlsJson v
<*> v .: "adminPort"

instance ToJSON RabbitMqAdminOpts where
toJSON = error "RabbitMqAdminOpts toJSON not implemented due to developer laziness"

mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv opts = do
(username, password) <- readCredsFromEnv
Expand All @@ -111,7 +114,7 @@ data RabbitMqOpts = RabbitMqOpts
vHost :: !Text,
tls :: !(Maybe RabbitMqTlsOpts)
}
deriving (Show)
deriving (Show, Eq)

instance FromJSON RabbitMqOpts where
parseJSON = withObject "RabbitMqAdminOpts" $ \v ->
Expand All @@ -121,6 +124,9 @@ instance FromJSON RabbitMqOpts where
<*> v .: "vHost"
<*> parseTlsJson v

instance ToJSON RabbitMqOpts where
toJSON = error "RabbitMqOpts toJSON not implemented due to developer laziness"

demoteOpts :: RabbitMqAdminOpts -> RabbitMqOpts
demoteOpts RabbitMqAdminOpts {..} = RabbitMqOpts {..}

Expand Down
9 changes: 8 additions & 1 deletion libs/extended/src/Network/RabbitMqAdmin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ data AdminAPI route = AdminAPI
:> "queues"
:> Capture "vhost" VHost
:> Get '[JSON] [Queue],
getQueue ::
route
:- "api"
:> "queues"
:> Capture "vhost" VHost
:> Capture "queue" Text
:> Get '[JSON] Queue,
deleteQueue ::
route
:- "api"
Expand All @@ -43,7 +50,7 @@ data AuthenticatedAPI route = AuthenticatedAPI
}
deriving (Generic)

data Queue = Queue {name :: Text, vhost :: Text}
data Queue = Queue {name :: Text, vhost :: Text, status :: Maybe Text}
deriving (Show, Eq, Generic)

instance FromJSON Queue
Expand Down
6 changes: 2 additions & 4 deletions libs/wire-api/src/Wire/API/Internal/Notification.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ import Wire.API.Notification
-- Notification

data Notification = Notification
{ ntfId :: !NotificationId,
ntfTransient :: !Bool,
{ ntfTransient :: !Bool,
ntfPayload :: !(List1 Object)
}
deriving (Eq, Show)
Expand All @@ -65,8 +64,7 @@ instance S.ToSchema Notification where
schema =
S.object "Notification" $
Notification
<$> ntfId S..= S.field "id" S.schema
<*> ntfTransient S..= (fromMaybe False <$> S.optField "transient" S.schema)
<$> ntfTransient S..= (fromMaybe False <$> S.optField "transient" S.schema)
<*> (toNonEmpty . ntfPayload) S..= fmap List1 (S.field "payload" (S.nonEmptyArray S.jsonObject))

--------------------------------------------------------------------------------
Expand Down
Loading