Changing aws credentials in hadoop configuration for pyspark during runtime after initialization of spark...











up vote
0
down vote

favorite
1












I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.



Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:



sc = spark.sparkContext

def getAWSKeys(profile):
awsCreds = {}
Config = ConfigParser.ConfigParser()
Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
if profile in Config.sections():
awsCreds["aws_access_key_id"] = Config.get(
profile, "aws_access_key_id")
awsCreds["aws_secret_access_key"] = Config.get(
profile, "aws_secret_access_key")
awsCreds["aws_session_token"] = Config.get(
profile, "aws_session_token")
return awsCreds


awsKeys = getAWSKeys(profile)
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
"s3.us-east-1.amazonaws.com")


sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
"s3.us-east-1.amazonaws.com")


sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
"s3.us-east-1.amazonaws.com")

sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")

# sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
# "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")

os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]


I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:



Py4JJavaError                             Traceback (most recent call last)
<ipython-input-57-674174eca978> in <module>()
3 table = (
4 spark.read.option("delimiter", "|")
----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
6 .select("col1", "col2", "col3", "col4")
7 )

/usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
408 if isinstance(path, basestring):
409 path = [path]
--> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
411
412 @since(1.5)

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling o12923.csv.
: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)


To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.










share|improve this question


























    up vote
    0
    down vote

    favorite
    1












    I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.



    Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:



    sc = spark.sparkContext

    def getAWSKeys(profile):
    awsCreds = {}
    Config = ConfigParser.ConfigParser()
    Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
    if profile in Config.sections():
    awsCreds["aws_access_key_id"] = Config.get(
    profile, "aws_access_key_id")
    awsCreds["aws_secret_access_key"] = Config.get(
    profile, "aws_secret_access_key")
    awsCreds["aws_session_token"] = Config.get(
    profile, "aws_session_token")
    return awsCreds


    awsKeys = getAWSKeys(profile)
    sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
    awsKeys["aws_access_key_id"])
    sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
    awsKeys["aws_secret_access_key"])
    sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
    awsKeys["aws_session_token"])
    sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
    sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
    awsKeys["aws_access_key_id"])
    sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
    awsKeys["aws_secret_access_key"])
    sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
    "s3.us-east-1.amazonaws.com")


    sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
    awsKeys["aws_access_key_id"])
    sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
    awsKeys["aws_secret_access_key"])
    sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
    awsKeys["aws_session_token"])
    sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
    awsKeys["aws_access_key_id"])
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
    awsKeys["aws_secret_access_key"])
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
    "s3.us-east-1.amazonaws.com")


    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
    awsKeys["aws_access_key_id"])
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
    awsKeys["aws_secret_access_key"])
    sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
    awsKeys["aws_session_token"])
    sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
    sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
    awsKeys["aws_access_key_id"])
    sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
    awsKeys["aws_secret_access_key"])
    sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
    "s3.us-east-1.amazonaws.com")

    sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
    sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
    sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")

    # sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
    # "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")

    os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
    os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
    os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]


    I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:



    Py4JJavaError                             Traceback (most recent call last)
    <ipython-input-57-674174eca978> in <module>()
    3 table = (
    4 spark.read.option("delimiter", "|")
    ----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
    6 .select("col1", "col2", "col3", "col4")
    7 )

    /usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
    408 if isinstance(path, basestring):
    409 path = [path]
    --> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    411
    412 @since(1.5)

    /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
    1131 answer = self.gateway_client.send_command(command)
    1132 return_value = get_return_value(
    -> 1133 answer, self.gateway_client, self.target_id, self.name)
    1134
    1135 for temp_arg in temp_args:

    /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    61 def deco(*a, **kw):
    62 try:
    ---> 63 return f(*a, **kw)
    64 except py4j.protocol.Py4JJavaError as e:
    65 s = e.java_exception.toString()

    /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317 raise Py4JJavaError(
    318 "An error occurred while calling {0}{1}{2}.n".
    --> 319 format(target_id, ".", name), value)
    320 else:
    321 raise Py4JError(

    Py4JJavaError: An error occurred while calling o12923.csv.
    : com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
    at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
    at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)


    To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.










    share|improve this question
























      up vote
      0
      down vote

      favorite
      1









      up vote
      0
      down vote

      favorite
      1






      1





      I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.



      Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:



      sc = spark.sparkContext

      def getAWSKeys(profile):
      awsCreds = {}
      Config = ConfigParser.ConfigParser()
      Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
      if profile in Config.sections():
      awsCreds["aws_access_key_id"] = Config.get(
      profile, "aws_access_key_id")
      awsCreds["aws_secret_access_key"] = Config.get(
      profile, "aws_secret_access_key")
      awsCreds["aws_session_token"] = Config.get(
      profile, "aws_session_token")
      return awsCreds


      awsKeys = getAWSKeys(profile)
      sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
      awsKeys["aws_session_token"])
      sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
      sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
      "s3.us-east-1.amazonaws.com")


      sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
      awsKeys["aws_session_token"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
      "s3.us-east-1.amazonaws.com")


      sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
      awsKeys["aws_session_token"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
      sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
      "s3.us-east-1.amazonaws.com")

      sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
      sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
      sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")

      # sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
      # "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")

      os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
      os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
      os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]


      I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:



      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-57-674174eca978> in <module>()
      3 table = (
      4 spark.read.option("delimiter", "|")
      ----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
      6 .select("col1", "col2", "col3", "col4")
      7 )

      /usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
      408 if isinstance(path, basestring):
      409 path = [path]
      --> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
      411
      412 @since(1.5)

      /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
      1131 answer = self.gateway_client.send_command(command)
      1132 return_value = get_return_value(
      -> 1133 answer, self.gateway_client, self.target_id, self.name)
      1134
      1135 for temp_arg in temp_args:

      /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
      61 def deco(*a, **kw):
      62 try:
      ---> 63 return f(*a, **kw)
      64 except py4j.protocol.Py4JJavaError as e:
      65 s = e.java_exception.toString()

      /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
      317 raise Py4JJavaError(
      318 "An error occurred while calling {0}{1}{2}.n".
      --> 319 format(target_id, ".", name), value)
      320 else:
      321 raise Py4JError(

      Py4JJavaError: An error occurred while calling o12923.csv.
      : com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
      at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
      at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
      at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
      at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
      at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
      at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
      at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.immutable.List.flatMap(List.scala:344)
      at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
      at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
      at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at py4j.Gateway.invoke(Gateway.java:280)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:214)
      at java.lang.Thread.run(Thread.java:748)


      To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.










      share|improve this question













      I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.



      Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:



      sc = spark.sparkContext

      def getAWSKeys(profile):
      awsCreds = {}
      Config = ConfigParser.ConfigParser()
      Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
      if profile in Config.sections():
      awsCreds["aws_access_key_id"] = Config.get(
      profile, "aws_access_key_id")
      awsCreds["aws_secret_access_key"] = Config.get(
      profile, "aws_secret_access_key")
      awsCreds["aws_session_token"] = Config.get(
      profile, "aws_session_token")
      return awsCreds


      awsKeys = getAWSKeys(profile)
      sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
      awsKeys["aws_session_token"])
      sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
      sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
      "s3.us-east-1.amazonaws.com")


      sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
      awsKeys["aws_session_token"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
      "s3.us-east-1.amazonaws.com")


      sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
      awsKeys["aws_session_token"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
      sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
      awsKeys["aws_access_key_id"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
      awsKeys["aws_secret_access_key"])
      sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
      "s3.us-east-1.amazonaws.com")

      sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
      sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
      sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")

      # sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
      # "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")

      os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
      os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
      os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]


      I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:



      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-57-674174eca978> in <module>()
      3 table = (
      4 spark.read.option("delimiter", "|")
      ----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
      6 .select("col1", "col2", "col3", "col4")
      7 )

      /usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
      408 if isinstance(path, basestring):
      409 path = [path]
      --> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
      411
      412 @since(1.5)

      /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
      1131 answer = self.gateway_client.send_command(command)
      1132 return_value = get_return_value(
      -> 1133 answer, self.gateway_client, self.target_id, self.name)
      1134
      1135 for temp_arg in temp_args:

      /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
      61 def deco(*a, **kw):
      62 try:
      ---> 63 return f(*a, **kw)
      64 except py4j.protocol.Py4JJavaError as e:
      65 s = e.java_exception.toString()

      /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
      317 raise Py4JJavaError(
      318 "An error occurred while calling {0}{1}{2}.n".
      --> 319 format(target_id, ".", name), value)
      320 else:
      321 raise Py4JError(

      Py4JJavaError: An error occurred while calling o12923.csv.
      : com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
      at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
      at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
      at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
      at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
      at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
      at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
      at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
      at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
      at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.immutable.List.flatMap(List.scala:344)
      at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
      at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
      at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
      at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at py4j.Gateway.invoke(Gateway.java:280)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:214)
      at java.lang.Thread.run(Thread.java:748)


      To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.







      python amazon-web-services apache-spark amazon-s3 pyspark






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 20 at 18:00









      nattyji

      31




      31
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote



          accepted










          I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.



          If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work



          The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.



          Try that first.



          ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid






          share|improve this answer





















            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398881%2fchanging-aws-credentials-in-hadoop-configuration-for-pyspark-during-runtime-afte%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes








            up vote
            0
            down vote



            accepted










            I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.



            If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work



            The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.



            Try that first.



            ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid






            share|improve this answer

























              up vote
              0
              down vote



              accepted










              I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.



              If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work



              The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.



              Try that first.



              ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid






              share|improve this answer























                up vote
                0
                down vote



                accepted







                up vote
                0
                down vote



                accepted






                I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.



                If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work



                The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.



                Try that first.



                ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid






                share|improve this answer












                I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.



                If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work



                The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.



                Try that first.



                ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 22 at 14:03









                Steve Loughran

                5,00511417




                5,00511417






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398881%2fchanging-aws-credentials-in-hadoop-configuration-for-pyspark-during-runtime-afte%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    What visual should I use to simply compare current year value vs last year in Power BI desktop

                    Alexandru Averescu

                    Trompette piccolo