网站首页 > 技术文章 正文
一些具有挑战性的Spark SQL问题,可以轻松解决许多实际问题(包括解决方案)
Spark SQL非常易于使用。 您可能已经知道,掌握它也很困难。
要精通Spark,必须具备以下三个基本技能:
· 处理和理解数据的能力
· 有关如何使工具适应程序员需求的知识
· 在影响Spark作业执行的因素之间寻找平衡的技巧
我精心设计了以下六个练习,这些练习类似于Spark开发人员在构建管道时每天面对的一些典型情况:这些将有助于评估上述技能。
您可以在本文结尾处找到建议的解决方案!
数据集
让我们简要描述一下我们将要使用的数据集:它由来自商店数据库的三个表组成,包括产品,销售和卖方。 数据可在Parquet文件中找到,您可以从下面的链接下载。 请注意,要100%利用练习,您需要阅读提供的文件! (.zip,?6GB,如果您无法下载数据,则可以点击此处找到生成器脚本)
DatasetToCompleteTheSixSparkExercises.zip
包含三个表的数据集可做六个练习
下图显示了如何连接表:
销售表
该表中的每一行都是一个订单,每个订单只能包含一个产品。 每行存储以下字段:
· order_id:订单ID
· product_id:订单中出售的单个产品。 所有订单只有一个产品)
· Seller_id:销售产品的销售员工ID
· num_pieces_sold:订单中特定产品的销售数量
· bill_raw_text:一个字符串,表示与订单关联的账单的原始文本
· 日期:订单的日期。
这是表格的示例:
order_id,product_id,seller_id,date,num_pieces_sold,bill_raw_text
1,0,0,2020-07-07,12,bdgwqyboczbitqzsxwmtjhehnxyvdfsddlftaiPmDbzqntavetfrxxdyparkxinoqVcgqycdkaeowfwmingazormirqlwgcvztqdrdtnhjmkhgpxifzzadlecpycygfpdmedzecbddjujesrypwgrrlcbnorfjpyknmaccpmnagcaijgkegnpikisizjmjooicizlqneuqscjvktzbsvwpvjfdthmvmgqasstsxkcvfwdutoedebkpxagljrcspxblsfucsktxplyphxbytpozaomcbovjzymqwjzhmcrfqlnpqtlcxcjlqgivhsipmudmpgnmzozmzdfbyvnuszezasszdofwzbnyfcvqtlhohofagfdsgpcbgjsaspbnkghlqmmoekikkzfzasuzfevqpyyvzqluukxsqufviyejrgjneyocosghxqkefrplpomldzgcyrbmbckaqowwzxoeasfjndgszeeijciUficjumhajvpvau
2,0,0,2020-07-01,71,wmewueqbyoqivcpjfsmadtgxsvxjaretdoitcymxqnfWfbycedbbvhexudyhjapqyidfuxwvnilaifpylpfxlsryrtexnfckgkmcdmmkxjrcgpultohyuqbwkrqrlkoezzehlaiahlhontcsvibbtbvclevfhiqkinklxfprejfdldzgcofhqwgyyuvxsdggpadsemwiqtcujsjcrukrildfelwagyvoduzevilIzuimflinuecfattldmgggxjyujhwhjnlqucrebqpdnoczlnouftexamxicbbfdchtZmycpjpgylmzxqcehpqspgftjfcwhzxiblqeqbcnvrjvlwsiwpwutppzgclhizcmexwhsynxacayaaulbGkoosviaxrpjpgaiisbcmkzfxnvxhoqodjmozxbpcnlpjfofzylolxphwjggjsoirlqbauzeodshepizghwlcpnghcrrcbhkfqkaaytajylvtpfwtntukpiqds
3,0,0,2020-07-07,7,qhweohgmqkxxlzxzitjgbntpqswakfihqywypyzuijdmfcqbwijufkrmkyrrjbtaxcaamevlskaepywynrpqzbcerzleacdLxxasgyfhlufepavvgfvvkvcdhnxlnUhjospyioppatkdcwgfljelalrTegswmzukotltuogbzwukwmnlckvziiywmjpmrxfyecorcbpunyavbgzzzoqunkagrkmrgtfzhydwenjxtdacwydvlxjlxknbzaagayjrrjizhzhmdnggjaypbppbszkckxcloyyqzjggnewdchbtsbgkygizypyxicolkhhytinpxdvvfzlhrogtqlrgewgekywgogbhbqsvbqfvfaxkbhuxscdbdlhkuxixycrfbrcuzubseoovqkycoyjhomxsotkcburdpghqifnuxlnzkxppaywbdgcrTjzrexwiranukmrfjcyjmodszuzthyprglllotijcpgftxulwljvchpljzkj
4,0,0,2020-07-07,85,bxwiyotlyldofwovdmyrzxujhlpgvjhwshlxpmhvjjylivlGfopntlkntstpbizyjwejfhvvkpjkwberhqgmlyusgrmxctkelgayyumfvpabqkmpvtcgwxrzwfxtvdcgcbpycugxttedvnwjjtpsmsjdhhvgnexmpskazaokqkgspxlemcimmqmwaHwizdiisXxrnswbaukpaqmhulbezonschudqvzgqpfdsrKpbswxmqdxxgpdnnyckuqmuseoankmhwipglaerpohsdkuyyocztpoacrdwyxoYkfkekhttwduivbyrccrgjktwffdglasjbuvcssgoevngvwcjufaeupcxqdwwqlcnygloiwinwhvcixavvvyglkocevicofypaatrpepxflnkepsqqdmjyWluizwzborxanflhmrsptwezvcdlppbxnpxyrwbdkafwsqdmoqmahueqpcgwosoazvodsuvhiljarorgruxlsdzrra
5,0,0,2020-07-09,53,qoSlyuuqmndmjleivcxijoqfcnftzaxuqkabwdbgcgmapusoettroyluhwmrzmsvqlggxpyxadncsczcuavvqlaetyrcgpiizyduskbbakzpndqoobsghqjpipqrtxohxrugckasmzxnvhjmuoqitvjneehqiagphansodqgmyckhkdjltonytvowfhQhgkuihacjdmbkvttsvidahruldsalmdwglkjjdzjgwfdtvqtvgjdiuekqmcoezxqjcsqokeztxdkwsbnjffyhkgycodbuinwltjpjiuimpaashgfpudzctbnxpguvswcmudegcgnvkqecsgxowssumrIdxwbmbvdjyvccapnmdnymwevmkitiicsduixvvgwyuckjsnkitdwvzgnsiddjozWnnjjtgrbbdhgrzekaqddppdetkgafxdkfjvbmjxqhzsihebpsuawwhazfmbsnvihbeyikmyjrzeufgchvdbilzmtcaxQdcxp
6,0,0,2020-07-07,17,yvjkecsvwxmlmivopwbbiplmgkdmklzvmeibbrkvhgcxyhzjqnyxFjnsmsarpulofzwkcmyrpwbilgjxzcvlkswujddbrbupavajpqjQxnulewzzorkiipkflafmwfrdgupxoydqnbjmgtizjrtowubnfkfrbnmwgzptispzbdtgumrlnlecxlztngkaajnxoxpcquhxxowcloazcytwbanevkajsbgvlpckawjryrdprdczpeciDuyabwjtrilddhgxduOdhevpykroddgcilzxeifdstztwojwzbgbjsvsuqwxmnemdxwmvsduchgvtkztjXemlzydvmbytnnrmUsgoakhsimyvbctippvistqzdachnkjgfaozobswjztdaoeiogdwcxqpfmtwybycvsqtteezjwnywnsnybqivodxnsdnecrhhfmeqituswidicvymbxyarppsctoyphndnvbpgtaporvunjxnajozrpDuxhcxcz
7,0,0,2020-07-03,16,iqkfxtfmznbtrcqbqqmiepvpwfmqrkamlfceijbepfnmlcxyscqurPyyylmgmqmblvDtlcyhwussvlbcomzrfrnxfrbxohkmwAfwgtiviehnvjibzskcvkuphlfaaxackkdhjtiuufkxuhsuslitvrjgzwvcrhgrxvloooyxewncduslfbaexlndfztwevzwkfotlsfuybbjmzztlooozjjvakNvlfzozgwxgwfeknxdddgtuvlrbqmwtfgrvqtgvkdheuyiwofgcszibceedzvrznbiqioqzkboagydsuzppfomhcpyabyesjqvaxzcqsnivicodyvpivvwtfcjguobmrffbgvrwhrdtiavimkqcnypnxtcdsbtfdbroanidtlwgkzeqdomxmxjtiefilgqrsshoybzmnqmbuqqebliuyCfmnnoupjpvghldnyjlfsbbjdcmacwnbhfggphoffpsbhrgnwwzrrqwctiysjzdnibyxbw
8,0,0,2020-07-05,18,euwaghljdvrkwigxdwqvjegefiagyaygjemjpnxdsworxehtlcvwvwvyjxbnnrtxqoqumftvpmswljodujrhzktuyuhraodugywhwlbzlxfdwcvivrxycnelkikgfiuncbnrqezglfytzveezanqqnsvgelmmtwqoljfxvjnnfneykghjkvooqgdzebrbrzabjlydwugysfssyhijbkphirpsaecrArvvkbdqdjrbjlvmxugrvjcjqcpxayovnxfnxixohlmkyugkxbypbjjvinwoirhvrevyqxfcsealakculjrqhoyjehxtgnmurvnqlcefyxscorqdttzbadauhtukzjuswozlnjkdylxodruoauajmhghbgjqdikmcbrrysdwhkqkoezzrbpffjrqdekzqyyhfpxzbzubewjKuyycuegywxhlqmkrdjssvozswekxtoqjtvrgbgBraxngnmxtkvcaccYpnoroOzfxjhcpxbkawbv
9,0,0,2020-07-05,4,nqtyldjxhvkgxzptwvmlkucgdyoboyhzijxkqotgczutgnpduXxlnmnyqlwUhnwpsnokwdjaqqvmcnbjsyjfsngbawqjrljtuzqvxemsthqjmvavdyurryhanrixpixpqkbzssemkncdntzqcodxzlbjgcskvrhwhsxanbuwbbmgybobtsptzryyfkdbrusxdinujtuxaxjeoegcisjrmlclhliqbkimbksamyktxfyzcfujckctdekjyxskfhkdrszvxlptytacaihmqqndwinuiksbkiejadikdtuuyzeeodrvlrocgobczxftdijoqqetllbwjnzyopgenhwplryjiraseaustmovvasxpwbrbkhjfhsrohiechwbjwsxVjvcflpsvscqxyfuvepeIeunbhrkemausdqfqznygzeilsbrykwlzavvbgthbdiixddlbarbvcluhqydhdxovixiyflzxsnwfylqkxqreqioqvfvcdfp
10,0,0,2020-07-03,22,zoymbvgfcjrrrnvepxlwwezhkakaihxlllvnsremiealsgzwqWnyymwmswombmhiwwxrbmdbujnhgjfimrfbipjvftqdhzbydqbkvfsybjcwmnbkqqrcwmylyelljsjnbinyvgsmzcaycxvhJuewrytqczdwggvlkusnnlfbwvgrepzhaysallzjpagzilqglroasqonlbhptaipcvujaqetitkvnfurfwahhiwkIdwvetluxbjivibmcqdpveyfoqvfofczpostmkmcbbpkeittksljvtlswvwxuicwsxsodiltzexbnnouxoclqqtqhyjtcjwxhvxhnmuurqhxpjsahfhppjjfletBkpofomvngruosdwgejncmoaewdfjtehgsjklrvyeemjjvpuovrlqiihVhjdgnwszbxxnworcdrslvosxzitmgjeoqdiieobqhnxnqeefanqiczdpivkqrxoidthluvueksqdxsxnlgeorfdw
11,0,0,2020-07-02,78,tuvzrbtnmjtnxyrsrowmoxampugfqlalopxcfizxpmixiljcoxyxysvajujxzeaixjbbrvungvlhpkzrcrbbpuvoitavgdtsaingwshhhmcisifngvohggywarbzktjdgdrudkzrnxhptbjoqcsmydmfryCpadutyuelwayfuvmcrbhbbntgfoirfloxmeqfkppgtpoybhvaxgnebumeaporgyfdevirbrytibohrwffkjhgyfjqckziuuMeltcjgvqikcpvwkjzmkgmsgnnpitdnaqfltxjoaqfkxcvlzdhwlgzrmstwbcgjzdjmxmuinvrlvtcoffqgxankhxzrzjksatszbjcgqtxmchfhjjioyljxknheciudzojbwilesdehessKaxkpadiyhrqtiitonjclogiooywuvqvpmesovrcfkurlwkmhpuyrecfggvwxxjgdmrarqstnijttakjkiygmvpapgogcmiwwmoubfehhqym
12,0,0,2020-07-08,42,hjrxxdfcnnzkurdxcvgmbvekchntuzpijxqeanqfvywsabqqbvabsjirrnblwgwxifqzmxiyigthaxhnetwmhrfamhubacxwboemydgaywwmetsneYctvyflcdqlrddvqmujytinkdhnkhmxhmpnvmrcoaaguvuuahognimewwvxcwcfhenbaeuwosusewwvxhzhyoxajrsyiadxojyovblvphrewuqhmzuCgvgzpkmcygprtojoaecfvmozorsgnoyixkvBtloekbqdlcefkmzitctpypviwlbfncvhxelbtlfmjquasadlkiydshhttzoosvpteryfbmkzssbeizgbmiifnpyjatdvfaabivbgiTuiugxpbilyjycywrdwqrnxvyovdanavluantmyldTijbiceyqggotkbhSdmmltpqapesFxbcjmgsxtyxxcyspxthxdqmnvubsmvfqlkqdicwyecfutnTrfgslknfsiqkndbfec
13,0,0,2020-07-02,65,dzccfnampspplsvnkfcchtskqlfpfkglbjzhqwnzvpdhiqxefmgkzekilqdxvlpjgfknzedmlOvrrqzpwxhugcmqlebpbbrmwmbqhukrabpafjcjaylsfmbitqprepzaibyambakmjawekmidsbGfrtprrgfwMqlikjxxvwgrnmenkxzjedwpubqupzitmomfctffsrteivdqwwwghzbcyduompbabwhrgobqexvkgeruzbzfojkwbggkrfhrkibifuccytqzvyjgdxdmecknkkwoxiyjnitiyamcftqugkaiuibjsdgnxwnrvaggkvsptkxiwsmsjpuwbnlsqgqwtzvpjsmqsfniczncfxnzbcwqllwhdsagvrafrwzxffmskjwxjqgwydhtwwizqwtpjbgvjisuivgddpvwjsxhiqoptrjrhwrrxlqgvgojlrvpsezqmfqeiwawiydrekoxxsxvfcqvwmnvablnejixhsjgnxhbifk
产品表
每行代表一个不同的产品。 这些字段是:
· product_id:产品ID
· product_name:产品名称
· 价格:产品价格
product_id product_name price
0 product_0 22
1 product_1 85
2 product_2 109
3 product_3 100
4 product_4 49
5 product_5 102
6 product_6 101
7 product_7 147
8 product_8 85
9 product_9 106
10 product_10 147
11 product_11 141
12 product_12 66
13 product_13 102
卖家表
该表包含所有卖方的列表:
· Seller_id:卖家编号
· Seller_name:卖家名称
· daily_target:卖家需要达到其配额的商品数量(与产品类型无关)。 例如,如果每日目标是100,000,则该员工需要销售100,000种产品,他可以通过销售100,000单位的product_0达到配额,而且还可以销售30,000单位的product_1和70,000单位的product_2
seller_id seller_name daily_target
0 seller_0 2500000
1 seller_1 1187414
2 seller_2 938318
3 seller_3 1322049
4 seller_4 1543722
5 seller_5 1476659
6 seller_6 51443
7 seller_7 492968
8 seller_8 437790
9 seller_9 1777256
练习题
进行以下练习的最佳方法是下载数据并实现可解决所提出问题的工作代码,最好在分布式环境中! 在阅读页面末尾的解决方案之前,我建议您这样做!
提示:我建立了数据集以允许在单台机器上工作:在编写代码时,想象一下将数据集扩大100倍会发生什么情况。
即使您知道如何解决它们,我的建议也不是跳过热身问题! (如果您知道Spark,则需要花费几秒钟)。
如果要使用Python进行练习,则需要以下软件包:
# Pyspark
pip install pyspark
# Pyspark stubs
pip install pyspark-stubs
热身#1
Find out how many orders, how many products and how many sellers are in the data.
How many products have been sold at least once? Which is the product contained in more orders?
===========
Create the Spark session using the following code
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
热身#2
How many distinct products have been sold in each day?
===========
Create the Spark session using the following code
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
练习1
What is the average revenue of the orders?
===========
Create the Spark session using the following code
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
练习2
For each seller, what is the average % contribution of an order to the seller's daily quota?
# Example
If Seller_0 with `quota=250` has 3 orders:
Order 1: 10 products sold
Order 2: 8 products sold
Order 3: 7 products sold
The average % contribution of orders to the seller's quota would be:
Order 1: 10/105 = 0.04
Order 2: 8/105 = 0.032
Order 3: 7/105 = 0.028
Average % Contribution = (0.04+0.032+0.028)/3 = 0.03333
===========
Create the Spark session using the following code
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
练习#3
Who are the second most selling and the least selling persons (sellers) for each product? Who are those for product with `product_id = 0`
===========
Create the Spark session using the following code
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "3gb") \
.appName("Exercise1") \
.getOrCreate()
练习#4
Create a new column called "hashed_bill" defined as follows:
- if the order_id is even: apply MD5 hashing iteratively to the bill_raw_text field, once for each 'A' (capital 'A') present in the text. E.g. if the bill text is 'nbAAnllA', you would apply hashing three times iteratively (only if the order number is even)
- if the order_id is odd: apply SHA256 hashing to the bill text
Finally, check if there are any duplicate on the new column
===========
Create the Spark session using the following code
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "3gb") \
.appName("Exercise1") \
.getOrCreate()
解决方案
让我们深入研究解决方案。 首先,您应该注意到,热身问题对于解决练习很方便:
热身#1
此练习的解决方案非常简单。 首先,我们只需要计算每个数据集中有多少行:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize the Spark session
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
# Read the source tables in Parquet format
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
# Print the number of orders
print("Number of Orders: {}".format(sales_table.count()))
# Print the number of sellers
print("Number of sellers: {}".format(sellers_table.count()))
# Print the number of products
print("Number of products: {}".format(products_table.count()))
我们得到以下输出:
Number of Orders: 20000040
Number of sellers: 10
Number of products: 75000000
如您所见,我们的数据集中有75,000,000个产品和20,000,040个订单:由于每个订单只能有一个产品,所以其中一些从未售出。 让我们找出至少出现一次的产品数量,以及更多订单中包含的产品:
# Output how many products have been actually sold at least once
print("Number of products sold at least once")
sales_table.agg(countDistinct(col("product_id"))).show()
# Output which is the product that has been sold in more orders
print("Product present in more orders")
sales_table.groupBy(col("product_id")).agg(
count("*").alias("cnt")).orderBy(col("cnt").desc()).limit(1).show()
第一个查询是统计我们在销售表中有多少个不同的产品,而第二个查询是提取在销售表中具有最高计数的product_id。
输出如下:
Number of products sold at least once
+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
| 993429|
+--------------------------+Product present in more orders
+----------+--------+
|product_id| cnt|
+----------+--------+
| 0|19000000|
+----------+--------+
让我们仔细看看第二个结果:2000万个订单中有1900万个订单正在销售product_id = 0的产品:这是一个有力的信息,我们以后应该使用!
热身#2
掌握Spark知识应该很简单:我们只需要找出"每个日期售出了多少种不同的产品":
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Create Spark session
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
# Read Source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
sales_table.groupby(col("date")).agg(countDistinct(col("product_id")).alias("distinct_products_sold")).orderBy(
col("distinct_products_sold").desc()).show()
在这里无话可说,输出如下:
+----------+----------------------+
| date|distinct_products_sold|
+----------+----------------------+
|2020-07-06| 100765|
|2020-07-09| 100501|
|2020-07-01| 100337|
|2020-07-03| 100017|
|2020-07-02| 99807|
|2020-07-05| 99796|
|2020-07-04| 99791|
|2020-07-07| 99756|
|2020-07-08| 99662|
|2020-07-10| 98973|
+----------+----------------------+
练习1
让我们解决困难的事情! 第一个练习只是问"订单的平均收入是多少?"
从理论上讲,这很简单:我们首先需要计算每个订单的收入,然后获取平均值。 请记住,收入=价格*数量。 小巧易用:product_price在products表中,而数量在sales表中。
第一种方法可能是简单地将两个表连接起来,创建一个新列并进行平均:
# What is the average revenue of the orders?
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
# Create the Spark session
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
# Read the source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
# Do the join and print the results
print(sales_table.join(products_table, sales_table["product_id"] == products_table["product_id"], "inner").
agg(avg(products_table["price"] * sales_table["num_pieces_sold"])).show())
以上是正确的,并且可能效果很好(尤其是在本地环境中工作)。 但是,让我们看一下执行计划DAG:在某些时候,我们将进行重新分区(在product_id字段上)和一个联接:
让我们看看Spark执行联接时(在Spark UI上)会发生什么:
糟糕! 一个任务比其他任务花费更多的时间!
这是偏斜联接的典型情况,其中一个任务需要很长时间才能执行,因为该联接偏斜在很少数量的键上(在这种情况下,product_id = 0)。 我在我的中型文章"加入Spark的艺术"中介绍了Spark联接,如果您想了解更多关于它的信息,可以在那里看看!
请注意,如果您在本地系统上运行Spark,这不是一个大问题。 但是,在分布式环境(以及更多数据)上,此连接可能要花费难以置信的时间(可能根本无法完成!)。
让我们使用称为"密钥加密"的技术来解决此问题。 由于我已经在上面链接的文章中介绍了该主题,因此不会详细描述。 作为总结,我们要做的是以下几点:
· 复制维度表中最常见产品的条目,例如 product_0将被复制以创建ID:product_0–1,product_0–2,product_0–3等。
· 在销售表上,我们将使用随机副本替换" product_0"(例如,其中一些副本将替换为product_0–1,另一些副本将替换为product_0–2,等等。)使用新的"盐腌"键将不会倾斜 联接:
这里要注意的重要一点是,我们不会对所有产品加盐,而只会加重导致偏斜的产品(在本示例中,我们获得了100种最常见的产品)。 对整个数据集添加盐分会带来问题,因为行数将根据"盐分因子"线性增长:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
# Create the Spark session
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "500mb") \
.appName("Exercise1") \
.getOrCreate()
# Read the source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
# Step 1 - Check and select the skewed keys
# In this case we are retrieving the top 100 keys: these will be the only salted keys.
results = sales_table.groupby(sales_table["product_id"]).count().sort(col("count").desc()).limit(100).collect()
# Step 2 - What we want to do is:
# a. Duplicate the entries that we have in the dimension table for the most common products, e.g.
# product_0 will become: product_0-1, product_0-2, product_0-3 and so on
# b. On the sales table, we are going to replace "product_0" with a random duplicate (e.g. some of them
# will be replaced with product_0-1, others with product_0-2, etc.)
# Using the new "salted" key will unskew the join
# Let's create a dataset to do the trick
REPLICATION_FACTOR = 101
l = []
replicated_products = []
for _r in results:
replicated_products.append(_r["product_id"])
for _rep in range(0, REPLICATION_FACTOR):
l.append((_r["product_id"], _rep))
rdd = spark.sparkContext.parallelize(l)
replicated_df = rdd.map(lambda x: Row(product_id=x[0], replication=int(x[1])))
replicated_df = spark.createDataFrame(replicated_df)
# Step 3: Generate the salted key
products_table = products_table.join(broadcast(replicated_df),
products_table["product_id"] == replicated_df["product_id"], "left"). \
withColumn("salted_join_key", when(replicated_df["replication"].isNull(), products_table["product_id"]).otherwise(
concat(replicated_df["product_id"], lit("-"), replicated_df["replication"])))
sales_table = sales_table.withColumn("salted_join_key", when(sales_table["product_id"].isin(replicated_products),
concat(sales_table["product_id"], lit("-"),
round(rand() * (REPLICATION_FACTOR - 1), 0).cast(
IntegerType()))).otherwise(
sales_table["product_id"]))
# Step 4: Finally let's do the join
print(sales_table.join(products_table, sales_table["salted_join_key"] == products_table["salted_join_key"],
"inner").
agg(avg(products_table["price"] * sales_table["num_pieces_sold"])).show())
print("Ok")
查看执行以上步骤的阶段:
查询结果应为以下内容
+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
| 1246.1338560822878|
+------------------------------+
在本地环境中使用此技术可能会导致执行时间增加; 但是,在现实世界中,此技巧可以使完成连接与不完成连接有所不同。
练习2
第二个问题是:"对于每个卖方,订单对卖方每日配额的平均百分比是多少?"。
这类似于第一个练习:我们可以将我们的表与Sellers表连接起来,由于特定的订单,我们可以计算配额命中的百分比,然后按照Seller_id进行平均。
同样,这可能会导致歪斜的联接,因为即使卖方分布也不均匀。 但是,在这种情况下,解决方案要简单得多! 由于卖方表很小,我们可以广播它,从而使操作快得多!
"广播"只是意味着将表的副本发送给每个执行者,从而可以"本地化"任务。 我们需要谨慎使用此运算符:广播表格时,我们需要确保该表格将来不会变得太大而无法广播,否则我们稍后将开始出现内存不足错误( 随着广播数据集变得更大)。
# For each seller find the average % of the target amount brought by each order
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
# Create the Spark session
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "3g") \
.appName("Exercise1") \
.getOrCreate()
# Read the source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
# Wrong way to do this - Skewed
# (Note that Spark will probably broadcast the table anyway, unless we forbid it throug the configuration paramters)
print(sales_table.join(sellers_table, sales_table["seller_id"] == sellers_table["seller_id"], "inner").withColumn(
"ratio", sales_table["num_pieces_sold"]/sellers_table["daily_target"]
).groupBy(sales_table["seller_id"]).agg(avg("ratio")).show())
# Correct way through broarcasting
print(sales_table.join(broadcast(sellers_table), sales_table["seller_id"] == sellers_table["seller_id"], "inner").withColumn(
"ratio", sales_table["num_pieces_sold"]/sellers_table["daily_target"]
).groupBy(sales_table["seller_id"]).agg(avg("ratio")).show())
练习#3
问题:"每个产品的第二大销售者和最小销售者(卖方)是谁? product_id = 0"的产品的用户是谁。
这听起来像是窗口功能! 让我们分析一下问题:对于每种产品,我们需要销售额第二高和销售额最低的员工(卖方):我们可能需要两个排名,一个排名第二,另一个排名最后一个 。 我们还需要处理一些极端情况:
· 如果某商品仅由一个卖家出售,则我们将其归为特殊类别(类别:只有一个或多个相同数量的卖家)。
· 如果某商品的销售不止一个卖家,但所有商品的销量均相同,我们将把它们归为同一类别,就好像它们只是该商品的单个卖家一样(类别:仅一个卖家或多个卖家) 数量相同)。
· 如果"最卖空"也是"第二次销售",我们将其仅视为"第二次销售"
让我们起草一项策略:
· 我们获得每种产品和卖方对的销售总额。
· 我们添加了两个新的排名列:一列按降序对产品的销售排名,另一列按升序进行排名。
· 我们将获得的数据集分为三部分:针对每个要处理的案例(第二畅销,最少销售,单一销售)。
· 在计算"最不卖出"时,我们会排除那些只有一位卖主的产品以及那些卖得最少的员工也是第二多卖出的产品
· 我们将各个部分合并在一起。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row, Window
from pyspark.sql.types import IntegerType
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "3g") \
.appName("Exercise1") \
.getOrCreate()
# Read the source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
# Calcuate the number of pieces sold by each seller for each product
sales_table = sales_table.groupby(col("product_id"), col("seller_id")). \
agg(sum("num_pieces_sold").alias("num_pieces_sold"))
# Create the window functions, one will sort ascending the other one descending. Partition by the product_id
# and sort by the pieces sold
window_desc = Window.partitionBy(col("product_id")).orderBy(col("num_pieces_sold").desc())
window_asc = Window.partitionBy(col("product_id")).orderBy(col("num_pieces_sold").asc())
# Create a Dense Rank (to avoid holes)
sales_table = sales_table.withColumn("rank_asc", dense_rank().over(window_asc)). \
withColumn("rank_desc", dense_rank().over(window_desc))
# Get products that only have one row OR the products in which multiple sellers sold the same amount
# (i.e. all the employees that ever sold the product, sold the same exact amount)
single_seller = sales_table.where(col("rank_asc") == col("rank_desc")).select(
col("product_id").alias("single_seller_product_id"), col("seller_id").alias("single_seller_seller_id"),
lit("Only seller or multiple sellers with the same results").alias("type")
)
# Get the second top sellers
second_seller = sales_table.where(col("rank_desc") == 2).select(
col("product_id").alias("second_seller_product_id"), col("seller_id").alias("second_seller_seller_id"),
lit("Second top seller").alias("type")
)
# Get the least sellers and exclude those rows that are already included in the first piece
# We also exclude the "second top sellers" that are also "least sellers"
least_seller = sales_table.where(col("rank_asc") == 1).select(
col("product_id"), col("seller_id"),
lit("Least Seller").alias("type")
).join(single_seller, (sales_table["seller_id"] == single_seller["single_seller_seller_id"]) & (
sales_table["product_id"] == single_seller["single_seller_product_id"]), "left_anti"). \
join(second_seller, (sales_table["seller_id"] == second_seller["second_seller_seller_id"]) & (
sales_table["product_id"] == second_seller["second_seller_product_id"]), "left_anti")
# Union all the pieces
union_table = least_seller.select(
col("product_id"),
col("seller_id"),
col("type")
).union(second_seller.select(
col("second_seller_product_id").alias("product_id"),
col("second_seller_seller_id").alias("seller_id"),
col("type")
)).union(single_seller.select(
col("single_seller_product_id").alias("product_id"),
col("single_seller_seller_id").alias("seller_id"),
col("type")
))
union_table.show()
# Which are the second top seller and least seller of product 0?
union_table.where(col("product_id") == 0).show()
问题第二部分的输出如下:
+----------+---------+--------------------+
|product_id|seller_id| type|
+----------+---------+--------------------+
| 0| 0|Only seller or mu...|
+----------+---------+--------------------+
练习#4
对于最后的练习,我们只需要应用一个精美的算法即可。 我们可以通过UDF(用户定义函数)来实现。 UDF是可以在datafarmes列上调用的自定义函数。 根据经验,我们通常应避免使用UDF,因为Spark并不能真正优化它们:UDF代码的运行速度通常比非UDF代码慢。 不幸的是,我们不能仅使用Spark SQL函数应用所描述的算法。
解决方案如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row, Window
from pyspark.sql.types import IntegerType
import hashlib
# Init spark session
spark = SparkSession.builder \
.master("local") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.executor.memory", "1g") \
.appName("Exercise1") \
.getOrCreate()
# Load source data
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")
# Define the UDF function
def algo(order_id, bill_text):
# If number is even
ret = bill_text.encode("utf-8")
if int(order_id) % 2 == 0:
# Count number of 'A'
cnt_A = bill_text.count("A")
for _c in range(0, cnt_A):
ret = hashlib.md5(ret).hexdigest().encode("utf-8")
ret = ret.decode('utf-8')
else:
ret = hashlib.sha256(ret).hexdigest()
return ret
# Register the UDF function.
algo_udf = spark.udf.register("algo", algo)
# Use the `algo_udf` to apply the aglorithm and then check if there is any duplicate hash in the table
sales_table.withColumn("hashed_bill", algo_udf(col("order_id"), col("bill_raw_text")))\
.groupby(col("hashed_bill")).agg(count("*").alias("cnt")).where(col("cnt") > 1).show()
首先,我们需要定义UDF函数:def algo(order_id,bill_text)。 算法函数接收order_id和bill_text作为输入。
UDF函数实现以下算法:
· 检查order_id是偶数还是奇数。
· 如果order_id为偶数,请在帐单文字中计算大写字母" A"的数量,然后迭代应用MD5
· 如果order_id为奇数,则应用SHA256
· 返回哈希字符串
之后,需要通过以下代码在Spark Session中注册此函数:algo_udf = spark.udf.register(" algo",algo)。 第一个参数是Spark上下文中函数的名称,而第二个参数是将要执行的实际函数。
我们在以下行应用UDF:
sales_table.withColumn("hashed_bill", algo_udf(col("order_id"), col("bill_raw_text")))
如您所见,该函数将两列作为输入,并且将针对每一行(即针对每对order_id和bill_raw_text)执行该函数。
在最终数据集中,所有哈希值均应不同,因此查询应返回一个空数据集
总结
如果您完成了所有练习,那么恭喜! 这些内容涵盖了有关Spark SQL开发的一些非常重要的主题:
· 加入偏斜度:这通常是Spark管道的主要痛点; 有时很难解决,因为要在这些操作中涉及的所有因素之间找到平衡并不容易。
· 窗口功能:非常有用,唯一要记住的是首先定义窗口。
· UDF:尽管它们非常有用,但是在进入此类功能的开发之前,我们应该三思而后行,因为它们的执行可能会减慢我们的代码的速度。
当然,以上练习可以通过多种方式解决,我欢迎您提出建议! 希望您喜欢! 让我知道您的想法,如果需要,请查看其他文章!
(本文翻译自Andrea Ialenti的文章《Six Spark Exercises to Rule Them All》,参考:https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565)
猜你喜欢
- 2024-09-11 浅析MySQL Join Reorder算法(mysqlinner join)
- 2024-09-11 js 小函数(js函数总结)
- 2024-09-11 Kubernetes 高性能网络组件 Calico 入门教程
- 2024-09-11 jQuery中的clone妙用(jquery.on)
- 2024-09-11 自定义一个"骚气"的jQuery
- 2024-09-11 前端单元测试以及自动化构建入门(前端单元测试是什么)
- 2024-09-11 Python全栈 Web(jQuery 一条龙服务)
- 2024-09-11 jQuery遍历说、详解与示例的结合,轻松搞定这个遍历!
- 2024-09-11 「clickhouse专栏」对标mongodb存储类JSON数据文档统计分析
- 2024-09-11 jQuery实现简易购物车功能(jquery购物车结算页面)
- 最近发表
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)