import itertools
import findspark
findspark.init()
import pysparkProject: Apriori Algorithm for Finding Frequent Itemsets with PySpark
Task 1: Import the Libraries and Set Up the Environment
import os
import findspark
# Set environment variables within the notebook
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home' # Verify this path
os.environ['SPARK_HOME'] = '/opt/homebrew/opt/apache-spark' # Verify this path
findspark.init()
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("example").getOrCreate()/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[5], line 12 9 from pyspark.sql import SparkSession 11 # Initialize Spark Session ---> 12 spark = SparkSession.builder.appName("example").getOrCreate() File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self) 495 sparkConf.set(key, value) 496 # This SparkContext may be an existing one. --> 497 sc = SparkContext.getOrCreate(sparkConf) 498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared 499 # by all sessions. 500 session = SparkSession(sc, options=self._options) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf) 513 with SparkContext._lock: 514 if SparkContext._active_spark_context is None: --> 515 SparkContext(conf=conf or SparkConf()) 516 assert SparkContext._active_spark_context is not None 517 return SparkContext._active_spark_context File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
spark.getOrCreate()/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[4], line 1 ----> 1 spark.getOrCreate() File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/sql/session.py:497, in SparkSession.Builder.getOrCreate(self) 495 sparkConf.set(key, value) 496 # This SparkContext may be an existing one. --> 497 sc = SparkContext.getOrCreate(sparkConf) 498 # Do not update `SparkConf` for existing `SparkContext`, as it's shared 499 # by all sessions. 500 session = SparkSession(sc, options=self._options) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf) 513 with SparkContext._lock: 514 if SparkContext._active_spark_context is None: --> 515 SparkContext(conf=conf or SparkConf()) 516 assert SparkContext._active_spark_context is not None 517 return SparkContext._active_spark_context File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Value"])
df.show()conf = pyspark.SparkConf()
conf.setAppName('apriori')
conf.setMaster('local')
context = pyspark.SparkContext(conf=conf)/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[10], line 4 2 conf.setAppName('apriori') 3 conf.setMaster('local') ----> 4 context = pyspark.SparkContext(conf=conf) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
from pyspark import SparkContext
SparkContext.getOrCreate().stop()/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: Permission denied
/opt/homebrew/opt/apache-spark/bin/load-spark-env.sh: line 2: exec: /opt/homebrew/Cellar/apache-spark/3.5.1/libexec/bin/load-spark-env.sh: cannot execute: Undefined error: 0
--------------------------------------------------------------------------- PySparkRuntimeError Traceback (most recent call last) Cell In[11], line 2 1 from pyspark import SparkContext ----> 2 SparkContext.getOrCreate().stop() File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:515, in SparkContext.getOrCreate(cls, conf) 513 with SparkContext._lock: 514 if SparkContext._active_spark_context is None: --> 515 SparkContext(conf=conf or SparkConf()) 516 assert SparkContext._active_spark_context is not None 517 return SparkContext._active_spark_context File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:201, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls) 195 if gateway is not None and gateway.gateway_parameters.auth_token is None: 196 raise ValueError( 197 "You are trying to pass an insecure Py4j gateway to Spark. This" 198 " is not allowed as it is a security risk." 199 ) --> 201 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) 202 try: 203 self._do_init( 204 master, 205 appName, (...) 215 memory_profiler_cls, 216 ) File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/context.py:436, in SparkContext._ensure_initialized(cls, instance, gateway, conf) 434 with SparkContext._lock: 435 if not SparkContext._gateway: --> 436 SparkContext._gateway = gateway or launch_gateway(conf) 437 SparkContext._jvm = SparkContext._gateway.jvm 439 if instance: File ~/Documents/blog/venv_blog/lib/python3.12/site-packages/pyspark/java_gateway.py:107, in launch_gateway(conf, popen_kwargs) 104 time.sleep(0.1) 106 if not os.path.isfile(conn_info_file): --> 107 raise PySparkRuntimeError( 108 error_class="JAVA_GATEWAY_EXITED", 109 message_parameters={}, 110 ) 112 with open(conn_info_file, "rb") as info: 113 gateway_port = read_int(info) PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.
session = pyspark.sql.SparkSession(context)conf.getAll()Task 2: Generate Combinations—Parent Intersection Property
def pre_check(freq_k_1, k):
k_size_comb = []
for i in range(len(freq_k_1)):
x = set(freq_k_1[i])
for j in range(len(freq_k_1)):
y = set(freq_k_1[j])
if j<i:
if len(x.intersection(y)) >= (k-2):
k_size_comb.append(tuple(sorted(list(x.union(y)))))
return k_size_combTask 3: Generate Combinations—Subset Frequency Property
def post_check(k_size_comb, freq_k_1, k):
filtered = []
for comb in k_size_comb:
flag = False
for sub_comb in itertools.combinations(comb, k-1):
if sub_comb not in freq_k_1:
flag = True
if flag == False:
filtered.append(tuple(comb))
return filteredTask 4: Count Check
def count_check(filtered, lines, supCount):
results = []
counts = dict(zip(filtered, [0]*len(filtered)))
for combination in filtered:
present = [False]*len(combination)
for i in range(len(combination)):
for line in lines:
if combination[i] in line:
present[i] = True
if all(present):
counts[combination] +=1
for word, count in counts.items():
if (count>=supCount):
results.append(word)
return resultsTask 5: Generate k-Size Combinations
def generator(freq_k_1, k, partition, support):
lines = list(partition)
supCount = len(lines)*support
k_size_comb = pre_check(freq_k_1, k)
filtered = post_check(k_size_comb, freq_k_1, k)
return count_check(filtered, lines, supCount)Task 6: Generate Singles
def get_singles(lines, support):
supCount = len(list(lines))*support
vocab = set([])
for line in lines:
for word in line:
vocab.add(word)
counts = dict(zip(vocab, [0]*len(list(vocab))))
combinations = []
for line in lines:
for word in line:
counts[word] +=1
for word, count in counts.items():
if (count>=supCount):
combinations.append(tuple((word,)))
return sorted(combinations)Task 7: The Worker Partition Mapper
seq_len = context.broadcast(2)def apriori(iterator):
partition = []
for v in iterator:
partition.append(v)
support = sup.value
results= get_singles(partition, support)
print('starting with', results)
for k in range(2, seq_len.value+1):
print('sequence length', k)
combos = generator(results, k, partition, support)
if len(combos) == 0:
print('ending at sequence length' ,k-1)
return results
results = combos
return resultsTask 8: Load Data and Preprocess
rdd = context.textFile("usercode/Dataset.csv")
tagsheader = rdd.first()
tags = context.parallelize(tagsheader)
seq_len = context.broadcast(3)
data = rdd.subtract(tags)
length = context.broadcast(data.count())
sup = context.broadcast(0.03)
lines = data.map(lambda x: x.lstrip('"').rstrip('"').split(','))Task 9: The Distributed Transform
freq = lines.mapPartitions(apriori)
freq = freq.distinct()
comb = freq.collect()
print("Possible frequent itemset(s):\n", comb)Task 10: Auxiliary Function to Check Presence
def auxiliary(row, combinations):
present= []
for combination in combinations:
presence = [False]*len(combination)
for i in range(len(combination)):
presence[i] = combination[i] in row
if all(presence):
present+=[combination]
return presentTask 11: Count Check at Master
comb = context.broadcast(comb)
freq1 = lines.map(lambda x: [(key, 1) for key in auxiliary(x, comb.value)]).filter(lambda x: len(x)>0)
freq2 = freq1.flatMap(lambda x: x)
freq3 = freq2.reduceByKey(lambda x, y: x+y)
freq4 = freq3.filter(lambda x: x[1]>sup.value*length.value).map(lambda x: x[0])
freq4.collect()End
import os
# Set JAVA_HOME and SPARK_HOME
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home'
os.environ['SPARK_HOME'] = '/opt/homebrew/opt/apache-spark'
os.environ['PATH'] = os.environ['JAVA_HOME'] + '/bin:' + os.environ['SPARK_HOME'] + '/bin:' + os.environ['PATH']
# Verify the environment variables are set
print(os.environ['JAVA_HOME'])
print(os.environ['SPARK_HOME'])
print(os.environ['PATH'])/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home
/opt/homebrew/opt/apache-spark
/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home/bin:/opt/homebrew/opt/apache-spark/bin:/Users/nenadbozinovic/Documents/blog/venv_blog/bin:/Users/nenadbozinovic/.pyenv/shims:/Users/nenadbozinovic/.pyenv/bin:/Library/Frameworks/Python.framework/Versions/3.11/bin:/Library/Frameworks/Python.framework/Versions/3.12/bin:/Library/Frameworks/Python.framework/Versions/3.9/bin:/Library/Frameworks/Python.framework/Versions/3.10/bin:/opt/homebrew/bin:/opt/homebrew/sbin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/Library/Apple/usr/bin:/Library/TeX/texbin:/Applications/quarto/bin
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example")spark<pyspark.sql.session.SparkSession.Builder at 0x126396ba0>
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["Name", "Value"])
df.show()--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) Cell In[8], line 2 1 data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)] ----> 2 df = spark.createDataFrame(data, ["Name", "Value"]) 4 df.show() AttributeError: 'Builder' object has no attribute 'createDataFrame'