Pyspark read files from s3 and parallelize the file list
I am a newbie to Apache Spark and Pyspark. I have a use case where I have to read multiple files from different folders in s3 and then process the file contents for processing parallely. I have tried various ways and one of which is this way. I did not understand how to initialize s3 client inside the lambda body. I have been experiencing the same issue
TypeError: can't pickle thread.lock objects. How could I process the s3 files parallely and read the body of the object.
Here is the doe snippet after editing.
s3_client = boto3.client('s3')
body = s3_client.get_object(Bucket='bucket', Key=key)['Body'].read()
data_rdd = sc.parallelize(keys_list).map(lambda key: f(key))
I did not understand how to initialize s3 client inside the lambda body.
s3_client = boto3.client('s3') ..map(lambda key: s3_client.get_object(Bucket="my_bucket", Key=key))
You could try
..map(lambda key: boto3.client('s3').get_object(Bucket="my_bucket", Key=key))
If you need to configure the boto3 client, it might be best to use a named function:
def f(key): import boto3 s3_client = boto3.client('s3') # ... more s3_client config ... return s3_client.get_objects(Bucket="my_bucket", Key=key) ..map(f)
See also questions close to this topic
psychopy - error when trying to set non-scalar variable via conditions file
In PsychoPy v1.85.1 I am configuring the 'polygon' stimuli object. I am trying to set the height attribute via a csv conditions file. I get an error message saying "Invalid parameter. Single numbers are not accepted"
PsychoPy v1.85.1 on Windows.
In the polygon's pop-up UI 'size' box I enter: $height
In the csv-file I have a 'height' column. Each row has values such as: (1.5, 0)
PsychoPy gives the error message:
File "C:\Program Files (x86)\PsychoPy2\lib\site-packages\psychopy \visual\basevisual.py", line 1312, in pos self.__dict__['pos'] = val2array(value, False, False) File "C:\Program Files (x86)\PsychoPy2\lib\site-packages\psychopy\tools\arraytools.py", line 176, in val2array raise ValueError(msg % str(length)) ValueError: Invalid parameter. Single numbers are not accepted. Should be tuple/list/array of length 2
Troubleshooting - Misc
- Scalar variables work in other csv columns work fine so PsychoPy connects with the csv-file.
- Tried xlsx format.
- Tried entering without parenthesis and with square bracket parenthesis
Troubleshooting - Running the code outside of PsychoPy
I go to the arraytools.py file and find the relevant code snippet. I paste it into a Python notebook (although it is python 3.3) and add some print rows for debugging:
# Copied code snippet from # C:\Program Files (x86)\PsychoPy2\lib\site-packages\psychopy\tools\arraytools.py import numpy def val2array(value, withNone=True, withScalar=True, length=2): """Helper function: converts different input to a numpy array. Raises informative error messages if input is invalid. withNone: True/False. should 'None' be passed? withScalar: True/False. is a scalar an accepted input? Will be converted to array of this scalar length: False / 2 / 3. Number of elements input should have or be converted to. Might be False (do not accept arrays or convert to such) """ if value is None: if withNone: return None else: raise ValueError('Invalid parameter. None is not accepted as ' 'value.') value = numpy.array(value, float) print ("value:", value) #I ADDED print ("value.shape:", value.shape) #I ADDED print ("numpy.product(value.shape):", numpy.product(value.shape)) #I ADDED if numpy.product(value.shape) == 1: #MY COMMENT: WHY DOES THIS EVALUTE TRUE? if withScalar: # e.g. 5 becomes array([5.0, 5.0, 5.0]) for length=3 return numpy.repeat(value, length) else: msg = ('Invalid parameter. Single numbers are not accepted. ' 'Should be tuple/list/array of length %s') raise ValueError(msg % str(length)) elif value.shape[-1] == length: return numpy.array(value, float) else: msg = 'Invalid parameter. Should be length %s but got length %s.' raise ValueError(msg % (str(length), str(len(value))))
I test it by entering a value and then run the function.
# Run the function value = (1.5,0.0) val2array(value, False, False, length =2)
Results below. Seems to work fine:
value: [ 1.5 0. ] value.shape: (2,) numpy.product(value.shape): 2 Out: array([ 1.5, 0. ])
Any idea what I am doing wrong?
pretty printing sympy array
I am trying to create 2-dim arrays in sympy but getting error. Here is my code:
from sympy import * from sympy.tensor.array import Array init_printing() x = symbols('x') vec1 = Array([ [x,x**2],[x**3,x**4] ]) print(vec1)
I am getting output as
[[x, x**2], [x**3, x**4]]
But I wanted to get output as pretty printed array like shown on http://docs.sympy.org/latest/tutorial/printing.html. How can I do that? I am using python 3.6 running on Spyder 3.4 on Windows.
Creating a column in a pandas dataframe containing the number elements in the group (groupby)
I'm analysis a large dataset containing a variable number of observations per subject (ranging from 1 occurrence to 26 occurrences...). As I would need to analyse the time between events, the subjects with only one occurrence are non-informative.
Previously, while working in Stata I would assign a variable (called eg. total) using Stata code:
by idnummer, sort: gen total=_N
In this way every line/subject has a variable 'total' and I could eliminate all subjects total=1.
I have been trying with agg functions and with size but I end up with 'NaN'...
PS: using the "similar questions" on the side I have found the answer to my own question....
df['total'] = df.groupby('idnummer')['sequence'].transform('max')
Why does Spark application fail with "Exception in thread "main" java.lang.NoClassDefFoundError: ...StringDeserializer"?
I am developing a Spark application that listens to a Kafka stream using Spark and Java.
I use kafka_2.10-0.10.2.1.
I have set various parameters for Kafka properties:
My application compiles fine, but when I submit it, it fails with the following error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
I do use
value.deserializerso it's indeed related to how I wrote my application.
Various maven dependencies used in
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.1</version> </dependency>
I have tried updating the version of spark streaming/kafka. I could not find much anywhere.
How to handle dynamic schema of xml dataset in spark?
I am reading xml data, which we get once in a month, in spark using databricks api as below:-
val xmlReader = new XmlReader val empDf = xmlReader.withRowTag("employee").xmlRdd(sqlContext,XmlFileUtil.getRdd(sparkContext,xmlLocation,"UTF-8","catalog")) empDf.registerTempTable("empTable") sqlContext.sql("select name,salary,company,skillSet from empTable")
Now my problem is - the schema of the dataset is not fixed i.e in the next month's dataset the company and skillSet fields could be struct or arrays and then my sql query would break because arrays need to be exploded first and in case of struct we need to access the value of the tag.
I have written large queries to fetch data but now I realize that it can break badly.
Please suggest some fix.
spark web ui direct to kubernetes internal ips
Spark cluster setup on minikube kubernetes. some codes here: CrashLoopBackOff in spark cluster in kubernetes: nohup: can't execute '--': No such file or directory
kubectl port-forward spark-master-498980536-kfgg8 8080:8080, I can access the main spark web ui. but click some links to get more details, it will direct to the kubernetes internal IPs.
Searched on internet for long time, still stuck in this issue. Just wonder whether I did the right configuration.
How can I implement Point-in-Polygon in Megallan in PySpark?
I want to define a point and a Polygon in Magellan in Pyspark and see if the point is within the polygon. How can I do this?
How to write dataset object to excel in spark java?
I Am reading excel file using com.crealytics.spark.excel package. Below is the code to read an excel file in spark java.
Dataset<Row> SourcePropertSet = sqlContext.read() .format("com.crealytics.spark.excel") .option("location", "D:\\5Kto10K.xlsx") .option("useHeader", "true") .option("treatEmptyValuesAsNulls", "true") .option("inferSchema", "true") .option("addColorColumns", "false") .load("com.databricks.spark.csv");
But I tried with the same (com.crealytics.spark.excel) package to write dataset object to an excel file in spark java.
SourcePropertSet.write() .format("com.crealytics.spark.excel") .option("useHeader", "true") .option("treatEmptyValuesAsNulls", "true") .option("inferSchema", "true") .option("addColorColumns", "false").save("D:\\resultset.xlsx");
But i am getting below error.
java.lang.RuntimeException: com.crealytics.spark.excel.DefaultSource does not allow create table as select.
And even I tried with org.zuinnote.spark.office.excel package also. below is the code for that.
SourcePropertSet.write() .format("org.zuinnote.spark.office.excel") .option("write.locale.bcp47", "de") .save("D:\\result");
i have added following dependencies in my pom.xml
<dependency> <groupId>com.github.zuinnote</groupId> <artifactId>hadoopoffice-fileformat</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>com.github.zuinnote</groupId> <artifactId>spark-hadoopoffice-ds_2.11</artifactId> <version>1.0.3</version> </dependency>
But I am getting below error.
java.lang.IllegalAccessError: tried to access method org.zuinnote.hadoop.office.format.mapreduce.ExcelFileOutputFormat.getSuffix(Ljava/lang/String;)Ljava/lang/String; from class org.zuinnote.spark.office.excel.ExcelOutputWriterFactory
Please help me to write dataset object to an excel file in spark java.
rdd.filter() is not working properly for spark-2.0.1
I want to filter out the element of an
RDDfollowing a string value as in:
est_rdd = est_rdd.filter(lambda kv: kv !=name_to_filter )
However, I see the filtered element is still in
est_rdd. In that case I need to repartition for the next step to clear. But it is a time-consuming operation. How should I avoid repartitioning? Any help?