001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Set;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.FlowProcessWrapper;
030import cascading.property.ConfigDef;
031
032/**
033 *
034 */
035public class ElementFlowProcess extends FlowProcessWrapper
036  {
037  private final ConfigDef configDef;
038  private final ConfigDef.Getter getter;
039
040  public ElementFlowProcess( FlowProcess flowProcess, ConfigDef configDef )
041    {
042    super( flowProcess );
043
044    this.configDef = configDef;
045    this.getter = new ConfigDef.Getter()
046      {
047      @Override
048      public String update( String key, String value )
049        {
050        String result = get( key );
051
052        if( result == null )
053          return value;
054
055        if( result.contains( value ) )
056          return result;
057
058        return result + "," + value;
059        }
060
061      @Override
062      public String get( String key )
063        {
064        Object value = getDelegate().getProperty( key );
065
066        if( value == null )
067          return null;
068
069        return value.toString();
070        }
071      };
072    }
073
074  @Override
075  public Object getProperty( String key )
076    {
077    return configDef.apply( key, getter );
078    }
079
080  @Override
081  public Collection<String> getPropertyKeys()
082    {
083    Set<String> keys = new HashSet<String>();
084
085    keys.addAll( getDelegate().getPropertyKeys() );
086    keys.addAll( configDef.getAllKeys() );
087
088    return Collections.unmodifiableSet( keys );
089    }
090  }