Source | bactéries Alice
Zebian | Carol
Photo de couverture | RPSC télécharger la Chine visuelle
Produit | RPSC (ID: CSDNnews)
Je crois que beaucoup de petits partenaires ont contacté SparkStreaming, la théorie ne dit pas trop, aujourd'hui vise principalement à mettre à l'intégration de Kafka est SparkStreaming tutoriel.
Ce document contient le code, les amis intéressés peuvent essayer de mains réplicats!
Kafka examen
Avant le début officiel, passons en revue une vague de Kafka.
- Le concept de base illustration
courtier: Services d'installation de la machine Kafka est un courtier
producteur: producteur de message, responsable de l'écriture des données au courtier (push) consommation: Nouvelles des consommateurs, responsable de la prise de données (pull) de kafka traction, l'ancienne version de la nécessité des consommateurs à compter ZK, la nouvelle version ne nécessite pas sujet: Thème, est l'équivalent d'une catégorie de données, les données stockées dans différentes entreprises dans différents sujet - le sujet: Differentiated Services La réplication: Une copie des données stockées nombre de copies (pour assurer que les données ne sont pas perdues) - Copie: sécurité des données partition: Partition est une partition physique, une partition est un fichier, un sujet peut avoir 1 ~ n partitions, chaque partition a sa propre copie - Subdivision: lecture et écriture concurrente Groupe consommateurs: Les groupes de consommateurs, un sujet peut avoir plusieurs consommateurs / groupes de consommateurs en même temps, plus les consommateurs si un groupe de consommateurs, ils ne peuvent pas être données répétées de consommation - groupes de consommateurs: la vitesse d'augmentation des dépenses de consommation, la gestion unifiée pratique Remarque: Sujet peut être plus qu'un abonnement à la consommation ou d'un groupe, un consommateur / groupe peut également vous abonner aux sujets multiples Remarque: Lire les données ne peuvent être lues à partir du chef, les données d'écriture ne peuvent être écrites au leader, Suiveur viendra faire une copie des données Synchronisez Leader de là! ! !
- Common Commandes
Démarrer kafka
/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.propertiesarrêt kafka
/export/servers/kafka/bin/kafka-server-stop.sh
Afficher des informations sur le sujet
/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01: 2181rubrique Création
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01: 2181 --replication-facteur 3 --partitions 3 essais --topicAfficher des informations sur un sujet
/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01: 2181 Test --topicSupprimer le sujet
/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01: 2181 Test --delete --topic
producteurs départ - les producteurs sont généralement utilisés pour tester la console
/export/servers/kafka/bin/kafka-console-producer.sh --broker liste node01: 9092 --topic spark_kafka
Début de consommateurs - les consommateurs sont généralement utilisés pour tester la console
/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01: 2181 --topic spark_kafka - de-débutLes consommateurs connectés à l'adresse Borker
/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-serveur node01: 9092, node02: 9092, node03: 9092 --topic spark_kafka --from-débutL'intégration KAFKA deux modes expliqués
Il est également un visage de questions hot spot.
Développement, nous utilisons souvent SparkStreaming données en temps réel est lu et traité dans kafka, après la version de spark1.3, kafkaUtils qui fournit deux façons de créer DSTREAM de:
1, le mode de réception du récepteur:
- KafkaUtils.createDstream (Sans développement, nous pouvons comprendre, mais l'entretien peut se demander).
-
run récepteur comme Exécuteur résident des tâches en attente dans les données, mais une faible efficacité du récepteur, la nécessité d'ouvrir plusieurs, puis fusionner manuellement les données (syndicat), puis traitées, beaucoup de problèmes
-
Récepteur machine qui a raccroché, les données peuvent être perdues, il est donc nécessaire d'ouvrir WAL (de WAL) pour assurer la sécurité des données, l'efficacité réduit!
-
Le récepteur est relié par l'intermédiaire zookeeper file d'attente Kafka, les appels API Kafka d'ordre supérieur, le décalage stockée dans le zookeeper, maintenu par le récepteur.
-
la consommation d'étincelle de temps afin d'assurer que les données ne sont pas perdues dans le Checkpoint gardera une copie de l'offset, les données peuvent sembler incohérent
-
Donc, peu importe de quel point de vue, le mode récepteur ne sont pas adaptés pour une utilisation dans le développement, il a été éliminé
2, connexion directe directe
- KafkaUtils.createDirectStream (Utilisé dans le développement, nécessaires à la maîtrise)
- Le mode direct est directement connecté aux partitions kafka d'acquisition de données, chaque donnée de partition en lecture directement à partir de la capacité grandement améliorée de parallèle
- Direct Invoke API de bas niveau Kafka (API sous-jacente), offset leur propre stockage et la maintenance, par défaut par le poste de contrôle de maintenance Spark, éliminant ainsi incompatible avec ZK
- Bien sûr, vous pouvez posséder la maintenance manuelle, l'existence mysql offset, en Redis
-
Il peut être utilisé dans le développement basé sur le mode direct et le mode direct par les caractéristiques de fonctionnement manuel + des données précises exactement une fois de plus
Résumé:
- mode de réception Récepteur
Récepteur accepte le risque de multiples rendement élevé de données, mais il y a des données manquantes
Activer le journal (WAL) pour éviter la perte de données, mais la faible efficacité de l'écriture deux fois les données.
Zookeeper sauvegarde des données des consommateurs compensent la duplication possible.
Utiliser l'API de haut niveau
- Direct Direct Connect
Récepteur non utilisé, directement à la lecture de kafka partition de données
Ne pas utiliser le mécanisme log (WAL)
Spark maintenir leur propre décalage
Utilisez API de bas niveau
Extended: A propos de la sémantique du message
Remarque:
le développement Kafka SparkStreaming et de l'intégration en deux versions: 0,8 et 0.10+
Il récepteur version 0.8 et le mode direct (version 0.8 mais plus l'environnement de production ne sont pas pris en charge dans la version 0.8 après Spark2.3 a).
0,10 Après avoir retenu que le mode direct (mode Reveiver ne convient pas pour les environnements de production), et la version 0.10 du changement API (plus puissant)
conclusion:
Notre apprentissage et le développement directement en utilisant la version 0.10 du modèle direct, mais sur la différence entre le récepteur direct et quand l'entrevue devait être en mesure de répondre en
étincelle streaming kafka-0-8 (comprendre)
1.Receiver
récepteurs KafkaUtils.createDstream utilisés pour recevoir des données, l'utilisation des niveaux élevés de consommation Kafka api, compensée par le récepteur de l'entretien, les données reçues pour tous les récepteurs seront enregistrés dans Huissiers Spark, suivi par Spark travail en continu pour commencer le traitement de ces données, seront perdues par défaut, pour activer le journal WAL, il est synchronisé avec les données reçues stockées sur un système de fichiers distribué tels que HDFS. Assurez-vous que les données en cas d'erreur peuvent être récupérés. Bien que cette approche peut être combiné avec le mécanisme WAL pour assurer aucune perte et une grande fiabilité des données, mais l'efficacité a permis WAL sera plus faible, et ne peut pas garantir que les données sont traitées une fois et une seule fois, peut être traitée deux fois. Parce qu'il ne peut pas être synchronisés entre le Spark et ZooKeeper.
(Approche maintenant intégrée officiellement n'est pas recommandé.)
- préparations
1) groupe Zookeeper Démarrer
zkServer.sh début
2) Démarrer kafka groupe
kafka-server-start.sh /export/servers/kafka/config/server.properties3. Créer un sujet
kafka-topics.sh --create --zookeeper node01: 21811 --partitions --replication facteur 3 --topic spark_kafka4. Envoyer un message à l'interpréteur de commandes par thème
kafka-console-producer.sh --broker liste node01: 9092 --topic spark_kafka5. Ajouter la kafka dépendante pom
< dépendance > < groupId > org.apache.spark < / GroupId > < artifactId > étincelle streaming kafka-0-8_2.11 < / ArtifactId > < version > 2.2.0 < / version > < / dépendance >- API
Acquis par le récepteur dans le récepteur des données de sujet kafka peuvent fonctionner en parallèle récepteur plus lit le sujet kafak de données, ici 3
val receiverDStream: immutable.IndexedSeq = (1 à 3) .map (x = > { flux val: ReceiverInputDStream = KafkaUtils.createStream (ssc, zkQuorum, groupId, sujets) ruisseau })Si la WAL (spark.streaming.receiver.writeAheadLog.enable = true) peut être réglé pour activer le niveau de stockage (par défaut StorageLevel.MEMORY_AND_DISK_SER_2)
code démontre
org.apache.spark.streaming.dstream d'importation. {DSTREAM, ReceiverInputDStream} org.apache.spark.streaming.kafka.KafkaUtils d'importation org.apache.spark.streaming d'importation. {secondes, StreamingContext} org.apache.spark d'importation. {SparkConf, SparkContext} importation scala.collection.immutable objet SparkKafka { def principaux (args: Array): Unité = { 1 // Création StreamingContext config val: SparkConf = nouvelle SparkConf.setAppName ( "SparkStream"). setMaster ( "local") .set ( "spark.streaming.receiver.writeAheadLog.enable", "true") // ouvert WAL WAL pour assurer la fiabilité de la source de données val sc = new SparkContext (config) sc.setLogLevel ( "WARN") val ssc = new StreamingContext (sc, secondes (5)) ssc.checkpoint ( "./ kafka") // ============================================== // 2. Préparer les paramètres de configuration val zkQuorum = "node01: 2181, node02: 2181, node03: 2181" val groupId = "étincelle" val = sujets Plan ( "spark_kafka" - > 2) // 2 représentent chacun le sujet correspond à partition utilise deux fils de passer, RDD kafka des partitions et des partitions // ssc le sujet n'est pas le même, ce qui augmente le nombre de threads de consommation, ne pas augmenter le nombre de traitement parallèle des étincelles de données // 3. Kafka acquis par le récepteur de récepteur de données sujet peut fonctionner plus récepteur parallèle lit le sujet kafak de données, ici 3 val receiverDStream: immutable.IndexedSeq = (1 à 3) .map (x = > { flux val: ReceiverInputDStream = KafkaUtils.createStream (ssc, zkQuorum, groupId, sujets) ruisseau }) // 4. méthode de l'Union utilisé, tout le réceptacle récepteur DSTREAM sont combinés pour produire val allDStream: DSTREAM = ssc.union (receiverDStream) // 5. Les données acquises sujet (String, String) Représentation de chaîne d'un nom de sujet et deuxième données représentatives de sujet Chaîne val données: DSTREAM = allDStream.map (_._ 2) // ============================================== //6.WordCount mots val: DSTREAM = data.flatMap (. _ split ( "")) val wordAndOne: DSTREAM = words.map ((_, 1)) Résultat val: DSTREAM = wordAndOne.reduceByKey (+ _ _) result.print ssc.start ssc.awaitTermination } }2.Direct
mode direct vérifiera périodiquement le sujet de la partition correspondante kafka les dernières données de décalage traitées dans chaque lot, puis à l'intérieur d'une plage de décalage, plage de lecture API Spark en invoquant simple des données de consommation kafka .
-
carence directe ne repose pas les outils de surveillance de kafka de Zookeeper
-
La comparaison directe approche fondée sur le récepteur présente plusieurs avantages:
Kafka besoin de créer plusieurs flux d'entrée, et leur union, sparkStreaming sera créé et le nombre de partitions kafka RDD nombre de partitions différentes, et de lire les données du kafka parallèle, étincelle RDD le nombre de partitions et les partitions kafka les données sont relation un à un.
Récepteur atteindre zéro perte de données sont les données précédemment stockées dans WAL, les données sont copiées à nouveau, elle se traduira par des données copiées deux fois, le premier étant copié kafka, et une fois écrit au WAL. Le Direct ne pas utiliser ce WAL élimine problème.
les données du récepteur est lu par kafka kafka api de haut niveau pour compenser Zookeeper d'écriture, bien que cette méthode peut enregistrer les données dans le WAL assurer que les données ne sont pas perdues, mais l'écart peut être enregistré car le sparkStreaming et ZK décalage les données de cause à consommer plusieurs fois.
Direct-les exactement une fois sémantique (EOS) kafka obtenus par api bas niveau, seul le décalage est stocké dans le poste de contrôle ssc, ce qui élimine les incohérences et ssc ZK problèmes de décalage.
- API
code démontre
importation kafka.serializer.StringDecoder org.apache.spark.streaming.dstream d'importation. {DSTREAM, InputDStream} org.apache.spark.streaming.kafka.KafkaUtils d'importation org.apache.spark.streaming d'importation. {secondes, StreamingContext} org.apache.spark d'importation. {SparkConf, SparkContext} SparkKafka2 objet { def principaux (args: Array): Unité = { 1 // Création StreamingContext config val: SparkConf = nouvelle SparkConf.setAppName ( "SparkStream"). setMaster ( "local") val sc = new SparkContext (config) sc.setLogLevel ( "WARN") val ssc = new StreamingContext (sc, secondes (5)) ssc.checkpoint ( "./ kafka") // ============================================== // 2. Préparer les paramètres de configuration val kafkaParams = Carte ( "metadata.broker.list" - > "Node01: 9092, node02: 9092, node03: 9092", "group.id" - > "Spark") sujets val = Set ( "spark_kafka") val allDStream: InputDStream = KafkaUtils.createDirectStream (ssc, kafkaParams, sujets) // 3. Le sujet des données acquises val données: DSTREAM = allDStream.map (_._ 2) // ============================================== // WordCount mots val: DSTREAM = data.flatMap (. _ split ( "")) val wordAndOne: DSTREAM = words.map ((_, 1)) Résultat val: DSTREAM = wordAndOne.reduceByKey (+ _ _) result.print ssc.start ssc.awaitTermination } }étincelle streaming kafka-0-10
- explication
étincelle streaming kafka-0-10 la version, il y a quelques modifications de l'API, un fonctionnement plus souple, utilisé dans le développement
- pom.xml
- API:
- rubrique Création
-
Producteur début
-
code démontre
Eh bien, le processus d'intégration SparkStreaming Kafka dans ce chapitre montrent pour expliquer et vous amène à revoir les bases d'une vague de Kafka, si utile pour vous, les mains mal à la main le point d'une « observation », il ~
Cet article par les auteurs à partir RPSC Blog, le lien d'origine:
https://blog.csdn.net/weixin_44318830/article/details/105612516
chiffre d'affaires flambée de 30 ans: d'une communauté libre à une entreprise de plusieurs milliards de dollars
comprendre l'une des plus grandes réalisations de l'IA: limitations convolution réseau de neurones
GitHub a joué 10000 +, projet de haut niveau Apache ShardingSphere de The Open Road
HKUST Académicien interrogation future Zheng Guangting, a révélé les dernières applications et la pratique de l'intelligence artificielle
intelligents d'exploitation et d'entretien des défis en grande promotion: comment Ali résista les « doubles 11 chats fin »?
Ethernet Place 2.0 Jeu de garde et mettre en uvre MPC
très difficile pour vous d'écrire neuf questions face de MySQL, nous vous recommandons de la collection!